Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threat matching methods #82

Merged
merged 1 commit into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 55 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# pytm: A Pythonic framework for threat modeling

Define your system in Python using the elements and properties described in the pytm framework. Based on your definition, pytm can generate, a Data Flow Diagram (DFD), a Sequence Diagram and most important of all, threats to your system.
Define your system in Python using the elements and properties described in the pytm framework.
Based on your definition, pytm can generate, a Data Flow Diagram (DFD), a Sequence Diagram
and most important of all, threats to your system.

## Requirements

Expand Down Expand Up @@ -52,29 +54,11 @@ Element

```

For the security practitioner, you may supply your own threats file by setting `TM.threatsFile`. It should contain entries like:
## Model

```json
{
"SID":"INP01",
"target": ["Lambda","Process"],
"description": "Buffer Overflow via Environment Variables",
"details": "This attack pattern involves causing a buffer overflow through manipulation of environment variables. Once the attacker finds that they can modify an environment variable, they may try to overflow associated buffers. This attack leverages implicit trust often placed in environment variables.",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "target.usesEnvironmentVariables is True and target.sanitizesInput is False and target.checksInputBounds is False",
"prerequisites": "The application uses environment variables.An environment variable exposed to the user is vulnerable to a buffer overflow.The vulnerable environment variable uses untrusted data.Tainted data used in the environment variables is not properly validated. For instance boundary checking is not done before copying the input data to a buffer.",
"mitigations": "Do not expose environment variable to the user.Do not use untrusted data in your environment variables. Use a language or compiler that performs automatic bounds checking. There are tools such as Sharefuzz [R.10.3] which is an environment variable fuzzer for Unix that support loading a shared library. You can use Sharefuzz to determine if you are exposing an environment variable vulnerable to buffer overflow.",
"example": "Attack Example: Buffer Overflow in $HOME A buffer overflow in sccw allows local users to gain root access via the $HOME environmental variable. Attack Example: Buffer Overflow in TERM A buffer overflow in the rlogin program involves its consumption of the TERM environmental variable.",
"references": "https://capec.mitre.org/data/definitions/10.html, CVE-1999-0906, CVE-1999-0046, http://cwe.mitre.org/data/definitions/120.html, http://cwe.mitre.org/data/definitions/119.html, http://cwe.mitre.org/data/definitions/680.html"
}
```

**CAVEAT**

The `threats.json` file contains strings that run through eval\(\) -> make sure the file has correct permissions or risk having an attacker change the strings and cause you to run code on their behalf. The logic lives in the "condition", where members of "target" can be logically evaluated. Returning a true means the rule generates a finding, otherwise, it is not a finding.**

The following is a sample `tm.py` file that describes a simple application where a User logs into the application and posts comments on the app. The app server stores those comments into the database. There is an AWS Lambda that periodically cleans the Database.
The following is a sample `tm.py` file that describes a simple application where a User logs into the application
and posts comments on the app. The app server stores those comments into the database. There is an AWS Lambda
that periodically cleans the Database.

```python

Expand Down Expand Up @@ -133,6 +117,8 @@ tm.process()

```

### Diagrams

Diagrams are output as [Dot](https://graphviz.gitlab.io/) and [PlantUML](https://plantuml.com/).

When `--dfd` argument is passed to the above `tm.py` file it generates output to stdout, which is fed to Graphviz's dot to generate the Data Flow Diagram:
Expand Down Expand Up @@ -160,6 +146,8 @@ Generates this diagram:

![seq.png](.gitbook/assets/seq.png)

### Report

The diagrams and findings can be included in the template to create a final report:

```bash
Expand Down Expand Up @@ -196,6 +184,50 @@ Name|From|To |Data|Protocol|Port

```

## Threats database

For the security practitioner, you may supply your own threats file by setting `TM.threatsFile`. It should contain entries like:

```json
{
"SID":"INP01",
"target": ["Lambda","Process"],
"description": "Buffer Overflow via Environment Variables",
"details": "This attack pattern involves causing a buffer overflow through manipulation of environment variables. Once the attacker finds that they can modify an environment variable, they may try to overflow associated buffers. This attack leverages implicit trust often placed in environment variables.",
"Likelihood Of Attack": "High",
"severity": "High",
"condition": "target.usesEnvironmentVariables is True and target.sanitizesInput is False and target.checksInputBounds is False",
"prerequisites": "The application uses environment variables.An environment variable exposed to the user is vulnerable to a buffer overflow.The vulnerable environment variable uses untrusted data.Tainted data used in the environment variables is not properly validated. For instance boundary checking is not done before copying the input data to a buffer.",
"mitigations": "Do not expose environment variable to the user.Do not use untrusted data in your environment variables. Use a language or compiler that performs automatic bounds checking. There are tools such as Sharefuzz [R.10.3] which is an environment variable fuzzer for Unix that support loading a shared library. You can use Sharefuzz to determine if you are exposing an environment variable vulnerable to buffer overflow.",
"example": "Attack Example: Buffer Overflow in $HOME A buffer overflow in sccw allows local users to gain root access via the $HOME environmental variable. Attack Example: Buffer Overflow in TERM A buffer overflow in the rlogin program involves its consumption of the TERM environmental variable.",
"references": "https://capec.mitre.org/data/definitions/10.html, CVE-1999-0906, CVE-1999-0046, http://cwe.mitre.org/data/definitions/120.html, http://cwe.mitre.org/data/definitions/119.html, http://cwe.mitre.org/data/definitions/680.html"
}
```

The `target` field lists classes of model elements to match this threat against.
Those can be assets, like: Actor, Datastore, Server, Process, SetOfProcesses, ExternalEntity,
Lambda or Element, which is the base class and matches any. It can also be a Dataflow that connects two assets.

All other fields (except `condition`) are available for display and can be used in the template
to list findings in the final [report](#report).

> **WARNING**
>
> The `threats.json` file contains strings that run through `eval()`. Make sure the file has correct permissions
> or risk having an attacker change the strings and cause you to run code on their behalf.

The logic lives in the `condition`, where members of `target` can be logically evaluated.
Returning a true means the rule generates a finding, otherwise, it is not a finding.
Condition may compare attributes of `target` and also call one of these methods:

* `target.oneOf(class, ...)` where `class` is one or more: Actor, Datastore, Server, Process, SetOfProcesses, ExternalEntity, Lambda or Dataflow,
* `target.crosses(Boundary)`,
* `target.enters(Boundary)`,
* `target.exits(Boundary)`,
* `target.inside(Boundary)`.

If `target` is a Dataflow, remember you can access `target.source` and/or `target.sink` along with other attributes.

## Currently supported threats

```text
Expand Down
72 changes: 66 additions & 6 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import argparse
import inspect
import json
import logging
import random
import sys
import uuid
from collections import defaultdict
from collections.abc import Iterable
from hashlib import sha224
from os.path import dirname
from re import match
Expand Down Expand Up @@ -178,13 +181,23 @@ def __init__(self, json_read):
self.example = json_read['example']
self.references = json_read['references']

def apply(self, target):
if type(self.target) is list:
if target.__class__.__name__ not in self.target:
return None
if not isinstance(self.target, str) and isinstance(self.target, Iterable):
self.target = tuple(self.target)
else:
if target.__class__.__name__ is not self.target:
return None
self.target = (self.target,)
self.target = tuple(getattr(sys.modules[__name__], x) for x in self.target)

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
self.__module__, type(self).__name__, self.id, hex(id(self))
)

def __str__(self):
return "{0}({1})".format(type(self).__name__, self.id)

def apply(self, target):
if not isinstance(target, self.target):
return None
return eval(self.condition)


Expand Down Expand Up @@ -384,6 +397,53 @@ def _safeset(self, attr, value):
except ValueError:
pass

def oneOf(self, *elements):
for element in elements:
if inspect.isclass(element):
if isinstance(self, element):
return True
elif self is element:
return True
return False

def crosses(self, *boundaries):
if self.source.inBoundary is self.sink.inBoundary:
return False
for boundary in boundaries:
if inspect.isclass(boundary):
if (
(
isinstance(self.source.inBoundary, boundary)
and not isinstance(self.sink.inBoundary, boundary)
)
or (
not isinstance(self.source.inBoundary, boundary)
and isinstance(self.sink.inBoundary, boundary)
)
or self.source.inBoundary is not self.sink.inBoundary
):
return True
elif (self.source.inside(boundary) and not self.sink.inside(boundary)) or (
not self.source.inside(boundary) and self.sink.inside(boundary)
):
return True
return False

def enters(self, *boundaries):
return self.source.inBoundary is None and self.sink.inside(*boundaries)

def exits(self, *boundaries):
return self.source.inside(*boundaries) and self.sink.inBoundary is None

def inside(self, *boundaries):
for boundary in boundaries:
if inspect.isclass(boundary):
if isinstance(self.inBoundary, boundary):
return True
elif self.inBoundary is boundary:
return True
return False


class Lambda(Element):
onAWS = varBool(True)
Expand Down
53 changes: 52 additions & 1 deletion tests/test_private_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest
import random

from pytm.pytm import Actor, Boundary, Dataflow, Datastore, Server, TM
from pytm.pytm import Actor, Boundary, Dataflow, Datastore, Server, TM, Threat


class TestUniqueNames(unittest.TestCase):
Expand Down Expand Up @@ -129,3 +129,54 @@ def test_defaults(self):
self.assertEqual(resp_post.isEncrypted, server.isEncrypted)
self.assertEqual(resp_post.protocol, server.protocol)
self.assertEqual(resp_post.data, server.data)


class TestMethod(unittest.TestCase):

def test_defaults(self):
internet = Boundary("Internet")
cloud = Boundary("Cloud")
user = Actor("User", inBoundary=internet)
server = Server("Server")
db = Datastore("DB", inBoundary=cloud)
func = Datastore("Lambda function", inBoundary=cloud)
request = Dataflow(user, server, "request")
response = Dataflow(server, user, "response")
user_query = Dataflow(user, db, "user query")
server_query = Dataflow(server, db, "server query")
func_query = Dataflow(func, db, "func query")

default = {
"SID": "",
"description": "",
"condition": "",
"target": ["Actor", "Boundary", "Dataflow", "Datastore", "Server"],
"details": "",
"severity": "",
"mitigations": "",
"example": "",
"references": "",
}
testCases = [
{"target": server, "condition": "target.oneOf(Server, Datastore)"},
{"target": server, "condition": "not target.oneOf(Actor, Dataflow)"},
{"target": request, "condition": "target.crosses(Boundary)"},
{"target": user_query, "condition": "target.crosses(Boundary)"},
{"target": server_query, "condition": "target.crosses(Boundary)"},
{"target": func_query, "condition": "not target.crosses(Boundary)"},
{"target": func_query, "condition": "not target.enters(Boundary)"},
{"target": func_query, "condition": "not target.exits(Boundary)"},
{"target": request, "condition": "not target.enters(Boundary)"},
{"target": request, "condition": "target.exits(Boundary)"},
{"target": response, "condition": "target.enters(Boundary)"},
{"target": response, "condition": "not target.exits(Boundary)"},
{"target": user, "condition": "target.inside(Boundary)"},
]
for case in testCases:
t = Threat({**default, **{"condition": case["condition"]}})
self.assertTrue(
t.apply(case["target"]),
"Failed to match {} against {}".format(
case["target"], case["condition"]
),
)