From ad98eb203683916048915fbdf90e31797dd60222 Mon Sep 17 00:00:00 2001
From: Daniel Kim
Date: Mon, 9 Nov 2020 12:34:26 -0500
Subject: [PATCH 1/4] Initial groundwork to create operation heatmap JSON for
ATT&CK navigator
---
app/compass_svc.py | 148 +++++++++++++++++++++++++++++++++++++++--
hook.py | 3 +-
static/js/compass.js | 23 ++++++-
templates/compass.html | 19 +++++-
4 files changed, 182 insertions(+), 11 deletions(-)
diff --git a/app/compass_svc.py b/app/compass_svc.py
index f35039d..2384025 100644
--- a/app/compass_svc.py
+++ b/app/compass_svc.py
@@ -3,13 +3,13 @@
from aiohttp import web
from aiohttp_jinja2 import template
+from collections import defaultdict
from app.service.auth_svc import for_all_public_methods, check_authorization
@for_all_public_methods(check_authorization)
class CompassService:
-
def __init__(self, services):
self.services = services
self.auth_svc = self.services.get('auth_svc')
@@ -19,10 +19,14 @@ def __init__(self, services):
@template('compass.html')
async def splash(self, request):
adversaries = [a.display for a in await self.data_svc.locate('adversaries')]
- return dict(adversaries=sorted(adversaries, key=lambda a: a['name']))
+ operations = [o.display for o in await self.data_svc.locate('operations')]
+ return dict(
+ adversaries=sorted(adversaries, key=lambda a: a['name']),
+ operations=operations,
+ )
@staticmethod
- def _get_layer_boilerplate(name, description):
+ def _get_adversary_layer_boilerplate(name, description):
return dict(
version='3.0',
name=name,
@@ -44,6 +48,55 @@ def _get_layer_boilerplate(name, description):
)
)
+ @staticmethod
+ def _get_operation_layer_boilerplate(name, description):
+ return dict(
+ name=name,
+ versions=dict(
+ attack="8",
+ navigator="4.0",
+ layer="4.0",
+ ),
+ domain="enterprise-attack",
+ description=description,
+ sorting=0,
+ hideDisabled=False,
+ techniques=[],
+ gradient=dict(
+ colors=[
+ "#bdbdbd",
+ "#fc3b3b",
+ "#fd8d3c",
+ "#31a354"
+ ],
+ minValue=0,
+ maxValue=3
+ ),
+ legendItems=[
+ {
+ "label": "All ran procedures succeeded",
+ "color": "#31a354"
+ },
+ {
+ "label": "None of the ran procedures succeeded",
+ "color": "#fc3b3b"
+ },
+ {
+ "label": "Some of the ran procedures succeeded.",
+ "color": "#fd8d3c"
+ },
+ {
+ "label": "None of the procedures ran.",
+ "color": "#bdbdbd"
+ }
+ ],
+ metadata=[],
+ showTacticRowBackground=False,
+ tacticRowBackground="#dddddd",
+ selectTechniquesAcrossTactics=True,
+ selectSubtechniquesWithParent=False,
+ )
+
async def _get_all_abilities(self):
return 'All-Abilities', 'full set of techniques available', [ability.display for ability in await self.services.get('data_svc').locate('abilities')]
@@ -51,7 +104,7 @@ async def _get_adversary_abilities(self, request_body):
adversary = (await self.rest_svc.display_objects(object_name='adversaries', data=dict(adversary_id=request_body.get('adversary_id'))))[0]
return adversary['name'], adversary['description'], adversary['atomic_ordering']
- async def generate_layer(self, request):
+ async def generate_adversary_layer(self, request):
request_body = json.loads(await request.read())
ability_functions = dict(
@@ -60,7 +113,7 @@ async def generate_layer(self, request):
)
display_name, description, abilities = await ability_functions[request_body['index']](request_body)
- layer = self._get_layer_boilerplate(name=display_name, description=description)
+ layer = self._get_adversary_layer_boilerplate(name=display_name, description=description)
for ability in abilities:
technique = dict(
techniqueID=ability['technique_id'],
@@ -75,6 +128,91 @@ async def generate_layer(self, request):
return web.json_response(layer)
+ async def generate_operation_layer(self, request):
+ request_body = json.loads(await request.read())
+ operation = (await self.data_svc.locate('operations', match=dict(id=int(request_body.get('id')))))[0]
+ display_name = operation.name
+ description = 'Operation {name} was conducted using adversary profile {adv_profile}'.format(
+ name=operation.name,
+ adv_profile=operation.adversary.name
+ )
+ layer = self._get_operation_layer_boilerplate(name=display_name, description=description)
+ success_counter = defaultdict(int)
+ technique_counter = defaultdict(int)
+ no_success_counter = defaultdict(int)
+ skipped_counter = defaultdict(int)
+ technique_tactic_map = dict()
+ technique_dicts = dict() # Map technique ID to corresponding dict object.
+ for link in operation.chain:
+ technique_id = link.ability.technique_id
+ technique_counter[technique_id] += 1
+ technique_tactic_map[technique_id] = link.ability.tactic
+ if link.status == 0:
+ success_counter[technique_id] += 1
+ elif link.status in (1, 124, -3, -4, -5):
+ # Did not succeed if status was failure,
+ # timeout, collected, untrusted, or visibility.
+ no_success_counter[technique_id] += 1
+ else:
+ # Ability either queued or discarded.
+ skipped_counter[technique_id] += 1
+
+ for technique_id, num_procedures in technique_counter.items():
+ # case 1: all ran procedures succeeded
+ # case 2: all ran procedures failed
+ # case 3: none of the procedures ran
+ # case 4: some procedures ran, but not all of them succeeded
+
+ # Default case: none of the procedures for this technique were run.
+ score = 0
+ color = '#bdbdbd'
+ if skipped_counter[technique_id] < num_procedures:
+ if success_counter[technique_id] == 0:
+ # None of the procedures that ran for this technique succeeded
+ score = 1
+ color = '#fc3b3b'
+ elif no_success_counter[technique_id] == 0:
+ # All of the procedures that ran for this technique succeeded.
+ score = 3
+ color = '#31a354'
+ else:
+ # Some of the procedures that ran failed
+ score = 2
+ color = '#fd8d3c'
+ technique_dicts[technique_id] = dict(
+ techniqueID=technique_id,
+ tactic=technique_tactic_map[technique_id],
+ score=score,
+ color=color,
+ comment='',
+ enabled=True,
+ metadata=[],
+ showSubtechniques=False,
+ )
+
+ for technique_id, num_procedures in technique_counter.items():
+ # Check if we need to expand the parent technique.
+ if '.' in technique_id:
+ parent_id = technique_id.split('.')[0]
+
+ # Check if the parent technique was already processed
+ if parent_id in technique_dicts:
+ technique_dicts.get(parent_id)['showSubtechniques'] = True
+ else:
+ technique_dicts[parent_id] = dict(
+ techniqueID=parent_id,
+ tactic=technique_tactic_map[technique_id],
+ color='',
+ comment='',
+ enabled=True,
+ metadata=[],
+ showSubtechniques=True,
+ )
+
+ for _, technique_dict in technique_dicts.items():
+ layer['techniques'].append(technique_dict)
+ return web.json_response(layer)
+
@staticmethod
def _extract_techniques(request_body):
techniques = request_body.get('techniques')
diff --git a/hook.py b/hook.py
index 79d25f6..482f73f 100644
--- a/hook.py
+++ b/hook.py
@@ -9,6 +9,7 @@ async def enable(services):
app = services.get('app_svc').application
compass_svc = CompassService(services)
app.router.add_static('/compass', 'plugins/compass/static/', append_version=True)
- app.router.add_route('POST', '/plugin/compass/layer', compass_svc.generate_layer)
+ app.router.add_route('POST', '/plugin/compass/adversarylayer', compass_svc.generate_adversary_layer)
+ app.router.add_route('POST', '/plugin/compass/operationlayer', compass_svc.generate_operation_layer)
app.router.add_route('POST', '/plugin/compass/adversary', compass_svc.create_adversary_from_layer)
app.router.add_route('GET', '/plugin/compass/gui', compass_svc.splash)
diff --git a/static/js/compass.js b/static/js/compass.js
index 71c48b9..0a4a678 100644
--- a/static/js/compass.js
+++ b/static/js/compass.js
@@ -1,4 +1,4 @@
-function generateLayer() {
+function generateAdversaryLayer() {
function downloadObjectAsJson(data){
let exportName = 'layer';
let dataStr = "data:text/json;charset=utf-8," + encodeURIComponent(JSON.stringify(data, null, 2));
@@ -12,7 +12,26 @@ function generateLayer() {
let selectionAdversaryID = $('#layer-selection-adversary option:selected').attr('value');
let postData = selectionAdversaryID ? {'index':'adversary', 'adversary_id': selectionAdversaryID} : {'index': 'all'};
- restRequest('POST', postData, downloadObjectAsJson, '/plugin/compass/layer');
+ restRequest('POST', postData, downloadObjectAsJson, '/plugin/compass/adversarylayer');
+}
+
+function generateOperationLayer() {
+ function downloadObjectAsJson(data){
+ let exportName = 'operation_layer';
+ let dataStr = "data:text/json;charset=utf-8," + encodeURIComponent(JSON.stringify(data, null, 2));
+ let downloadAnchorNode = document.createElement('a');
+ downloadAnchorNode.setAttribute("href", dataStr);
+ downloadAnchorNode.setAttribute("download", exportName + ".json");
+ document.body.appendChild(downloadAnchorNode); // required for firefox
+ downloadAnchorNode.click();
+ downloadAnchorNode.remove();
+ }
+
+ let selectionOperationID = $('#layer-selection-operation option:selected').attr('value');
+ if (selectionOperationID) {
+ let postData = {'index':'operation', 'id': selectionOperationID};
+ restRequest('POST', postData, downloadObjectAsJson, '/plugin/compass/operationlayer');
+ }
}
function uploadAdversaryLayerButtonFileUpload() {
diff --git a/templates/compass.html b/templates/compass.html
index 34fa491..d7da7b0 100644
--- a/templates/compass.html
+++ b/templates/compass.html
@@ -10,7 +10,7 @@ find your way
upload the layer file to generate an adversary to use in an operation
-
Generate Layer
+
Generate Adversary Layer
-
+
+
+
+
Generate Operation Layer
+
+
+
+
Generate Adversary
From 52d4257f65545e43abcf97ccf1873aa84c8cd18d Mon Sep 17 00:00:00 2001
From: Daniel Kim
Date: Tue, 10 Nov 2020 08:49:28 -0500
Subject: [PATCH 2/4] Use dict to fetch colors
---
app/compass_svc.py | 29 ++++++++++++++++-------------
1 file changed, 16 insertions(+), 13 deletions(-)
diff --git a/app/compass_svc.py b/app/compass_svc.py
index 2384025..3354e5d 100644
--- a/app/compass_svc.py
+++ b/app/compass_svc.py
@@ -7,6 +7,13 @@
from app.service.auth_svc import for_all_public_methods, check_authorization
+_technique_colors = dict(
+ success='#44AA99', # Green
+ partial_success='#FFB000', # Orange
+ failure='#CC3311', # Red
+ not_run='#555555', # Dark grey
+)
+
@for_all_public_methods(check_authorization)
class CompassService:
@@ -64,10 +71,10 @@ def _get_operation_layer_boilerplate(name, description):
techniques=[],
gradient=dict(
colors=[
- "#bdbdbd",
- "#fc3b3b",
- "#fd8d3c",
- "#31a354"
+ _technique_colors['not_run'],
+ _technique_colors['failure'],
+ _technique_colors['partial_success'],
+ _technique_colors['success'],
],
minValue=0,
maxValue=3
@@ -75,19 +82,19 @@ def _get_operation_layer_boilerplate(name, description):
legendItems=[
{
"label": "All ran procedures succeeded",
- "color": "#31a354"
+ "color": _technique_colors['success']
},
{
"label": "None of the ran procedures succeeded",
- "color": "#fc3b3b"
+ "color": _technique_colors['failure']
},
{
"label": "Some of the ran procedures succeeded.",
- "color": "#fd8d3c"
+ "color": _technique_colors['partial_success']
},
{
"label": "None of the procedures ran.",
- "color": "#bdbdbd"
+ "color": _technique_colors['not_run']
}
],
metadata=[],
@@ -165,25 +172,21 @@ async def generate_operation_layer(self, request):
# Default case: none of the procedures for this technique were run.
score = 0
- color = '#bdbdbd'
if skipped_counter[technique_id] < num_procedures:
if success_counter[technique_id] == 0:
# None of the procedures that ran for this technique succeeded
score = 1
- color = '#fc3b3b'
elif no_success_counter[technique_id] == 0:
# All of the procedures that ran for this technique succeeded.
score = 3
- color = '#31a354'
else:
# Some of the procedures that ran failed
score = 2
- color = '#fd8d3c'
technique_dicts[technique_id] = dict(
techniqueID=technique_id,
tactic=technique_tactic_map[technique_id],
score=score,
- color=color,
+ color='',
comment='',
enabled=True,
metadata=[],
From 57da9ffa1572d685a64909f844fa42111263ddd1 Mon Sep 17 00:00:00 2001
From: Daniel Kim
Date: Wed, 11 Nov 2020 13:07:01 -0500
Subject: [PATCH 3/4] Enums for matrix colors, and including skipped abilities
in the operation
---
app/compass_svc.py | 83 ++++++++++++++++++++++++++++++++--------------
1 file changed, 59 insertions(+), 24 deletions(-)
diff --git a/app/compass_svc.py b/app/compass_svc.py
index 3354e5d..bc027c6 100644
--- a/app/compass_svc.py
+++ b/app/compass_svc.py
@@ -4,15 +4,16 @@
from aiohttp import web
from aiohttp_jinja2 import template
from collections import defaultdict
+from enum import Enum
from app.service.auth_svc import for_all_public_methods, check_authorization
-_technique_colors = dict(
- success='#44AA99', # Green
- partial_success='#FFB000', # Orange
- failure='#CC3311', # Red
- not_run='#555555', # Dark grey
-)
+
+class _HeatmapColors(Enum):
+ SUCCESS = '#44AA99' # Green
+ PARTIAL_SUCCESS = '#FFB000' # Orange
+ FAILURE = '#CC3311' # Red
+ NOT_RUN = '#555555' # Dark grey
@for_all_public_methods(check_authorization)
@@ -66,15 +67,29 @@ def _get_operation_layer_boilerplate(name, description):
),
domain="enterprise-attack",
description=description,
+ filters={
+ "platforms": [
+ "Linux",
+ "macOS",
+ "Windows",
+ "Office 365",
+ "Azure AD",
+ "AWS",
+ "GCP",
+ "Azure",
+ "SaaS",
+ "Network"
+ ]
+ },
sorting=0,
hideDisabled=False,
techniques=[],
gradient=dict(
colors=[
- _technique_colors['not_run'],
- _technique_colors['failure'],
- _technique_colors['partial_success'],
- _technique_colors['success'],
+ _HeatmapColors.NOT_RUN.value,
+ _HeatmapColors.FAILURE.value,
+ _HeatmapColors.PARTIAL_SUCCESS.value,
+ _HeatmapColors.SUCCESS.value,
],
minValue=0,
maxValue=3
@@ -82,19 +97,19 @@ def _get_operation_layer_boilerplate(name, description):
legendItems=[
{
"label": "All ran procedures succeeded",
- "color": _technique_colors['success']
+ "color": _HeatmapColors.SUCCESS.value
},
{
"label": "None of the ran procedures succeeded",
- "color": _technique_colors['failure']
+ "color": _HeatmapColors.FAILURE.value
},
{
- "label": "Some of the ran procedures succeeded.",
- "color": _technique_colors['partial_success']
+ "label": "Some of the ran procedures succeeded",
+ "color": _HeatmapColors.PARTIAL_SUCCESS.value
},
{
- "label": "None of the procedures ran.",
- "color": _technique_colors['not_run']
+ "label": "All procedures skipped",
+ "color": _HeatmapColors.NOT_RUN.value
}
],
metadata=[],
@@ -149,19 +164,39 @@ async def generate_operation_layer(self, request):
no_success_counter = defaultdict(int)
skipped_counter = defaultdict(int)
technique_tactic_map = dict()
- technique_dicts = dict() # Map technique ID to corresponding dict object.
+ technique_dicts = dict() # Map technique ID to corresponding dict object.
+ to_process = [] # list of (ability, status) tuples
+
+ # Get links from operation chain
for link in operation.chain:
- technique_id = link.ability.technique_id
+ to_process.append((link.ability, link.status))
+
+ # Get automatically skipped links
+ skipped_abilities = await operation.get_skipped_abilities_by_agent(self.data_svc)
+ for skipped_by_agent in skipped_abilities:
+ for _, skipped in skipped_by_agent.items():
+ for skipped_info in skipped:
+ status = skipped_info.get('reason_id')
+ ability_id = skipped_info.get('ability_id')
+ if status is not None:
+ ability = (await self.data_svc.locate('abilities', match=dict(ability_id=ability_id)))[0]
+ if ability:
+ to_process.append((ability, status))
+
+ # Count success, failures, no-runs for links.
+ for (ability, status) in to_process:
+ technique_id = ability.technique_id
technique_counter[technique_id] += 1
- technique_tactic_map[technique_id] = link.ability.tactic
- if link.status == 0:
+ technique_tactic_map[technique_id] = ability.tactic
+ if status == 0:
success_counter[technique_id] += 1
- elif link.status in (1, 124, -3, -4, -5):
- # Did not succeed if status was failure,
- # timeout, collected, untrusted, or visibility.
+ elif status in (1, 124, -3, -4):
+ # Did not succeed if status was failure, timeout, untrusted, or collected (in the
+ # case of collected/untrusted/timeout, the command may have run successfully, but
+ # we don't know for sure due to lack of timely response from the agent).
no_success_counter[technique_id] += 1
else:
- # Ability either queued or discarded.
+ # Ability either queued, manually discarded, or visibility threshold was surpassed.
skipped_counter[technique_id] += 1
for technique_id, num_procedures in technique_counter.items():
From 21234d3548312b489a05290f6a0e854bc20775d2 Mon Sep 17 00:00:00 2001
From: Daniel Kim
Date: Thu, 12 Nov 2020 11:54:39 -0500
Subject: [PATCH 4/4] deconflicting status IDs
---
app/compass_svc.py | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/app/compass_svc.py b/app/compass_svc.py
index bc027c6..5a513f1 100644
--- a/app/compass_svc.py
+++ b/app/compass_svc.py
@@ -176,9 +176,10 @@ async def generate_operation_layer(self, request):
for skipped_by_agent in skipped_abilities:
for _, skipped in skipped_by_agent.items():
for skipped_info in skipped:
- status = skipped_info.get('reason_id')
+ skipped_reason = skipped_info.get('reason_id')
ability_id = skipped_info.get('ability_id')
- if status is not None:
+ if skipped_reason is not None:
+ status = 10 + int(skipped_reason) # skipped_reason can be in range [0, 5]
ability = (await self.data_svc.locate('abilities', match=dict(ability_id=ability_id)))[0]
if ability:
to_process.append((ability, status))
@@ -196,7 +197,7 @@ async def generate_operation_layer(self, request):
# we don't know for sure due to lack of timely response from the agent).
no_success_counter[technique_id] += 1
else:
- # Ability either queued, manually discarded, or visibility threshold was surpassed.
+ # Ability either queued, skipped, manually discarded, or visibility threshold was surpassed.
skipped_counter[technique_id] += 1
for technique_id, num_procedures in technique_counter.items():