Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASM] - Expandr 4735 #27624

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Packs/CortexAttackSurfaceManagement/ReleaseNotes/1_6_20.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

#### Scripts

##### RankServiceOwners

- Updated the script to return a high-confidence set of most likely owners based on their relative ranking scores.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
"""


from typing import Dict, List, Any
from typing import Dict, List, Any, Iterable
import traceback
from itertools import groupby
import math

STRING_DELIMITER = ' | ' # delimiter used for joining Source fields and any additional fields of type string

Expand All @@ -29,9 +30,13 @@ def score(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:

def rank(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sort owners by ranking score
Sort owners by ranking score and use data-driven algorithm to return the top k,
where k is a dynamic value based on the relative scores

See _get_k for hyperparameters that can be used to adjust the target value of k
"""
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)
k = _get_k(scores=(owner['Ranking Score'] for owner in owners))
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)[:k]


def justify(owners: List[Dict[str, str]]) -> List[Dict[str, str]]:
Expand Down Expand Up @@ -139,6 +144,54 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return deduped


def _get_k(
scores: Iterable[float],
target_k: int = 5,
k_tol: int = 2,
a_tol: float = 1.0,
min_score_proportion: float = 0.75
Comment on lines +149 to +153

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you defensively verify that these values are within expected bounds? (for instance, that k_tol isn't -3.)

) -> int:
"""
Return a value of k such that:
- target_k >= k <= target_k + k_tol
- the top k scores comprise minimum specified proportion of the total score mass

See unit tests in RankServiceOwners_test.py for a more detailed specification of the
expected behavior.

Notable hyperparameters and where they come from:

:param target_k: the value of k we are roughly targeting (set by discussion with PM)
:param k_tol: our tolerance for k, or how many additional owners above `target_k` we are willing to show
(set by intuition/discussion with PM)
:param a_tol: max expected absolute different between two scores in the same "tier"
(set by somewhat arbitrarily/by intuition; see unit tests)
:param min_score_proportion: the targeted min proportion of the score mass
(identified using a gridsearch over values to find best outcome on unit tests)
"""
# get up to target_k scores that comprise the desired score proportion
scores_desc = list(sorted(scores, reverse=True))
min_score_proportion = sum(scores_desc) * min_score_proportion
k = 0
cumulative_score = 0.0
while cumulative_score < min_score_proportion and k < target_k:
cumulative_score += scores_desc[k]
k += 1

# score values are likely groupable into "tiers"; try to find a cutoff between tiers
# look for the end of the next element's tier (may be the current or next tier),
# where a tier is (arbitrarily) defined by an absolute difference of `a_tol`
tier_index = k
while tier_index < len(scores_desc) and math.isclose(scores_desc[tier_index], scores_desc[tier_index - 1], abs_tol=a_tol):
tier_index += 1

# add additional score(s) if within tolerance for k
if math.isclose(target_k, tier_index, abs_tol=k_tol):
k = tier_index

return k


def main():
try:
unranked = demisto.args().get("owners", [])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import demistomock as demisto # noqa: F401
import pytest
import unittest
from RankServiceOwners import score, main, rank, _canonicalize, aggregate
from RankServiceOwners import score, main, rank, _canonicalize, aggregate, _get_k


@pytest.mark.parametrize('owners,expected_out', [
(
# returned in sorted order
[
{
'Name': 'bob', 'Email': 'bob@example.com', 'Source': '',
Expand All @@ -27,6 +28,61 @@
},
]
),
(
# wraps one test case from _get_k
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
],
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
]
),
])
def test_rank(owners, expected_out):
assert rank(owners) == expected_out
Expand Down Expand Up @@ -395,3 +451,56 @@ def test_main(mocker, owners, expected_out, capfd):
# Verify the output value was set
expected_calls_to_mock_object = [unittest.mock.call('setAlert', {'asmserviceowner': expected_out})]
assert demisto_execution_mock.call_args_list == expected_calls_to_mock_object


def test_get_k():
"""
These cases are designed to specify the intuition we are trying to implement with the algorithm.
They are specific to a target value of 5; if the target_k changes, these tests should update to reflect that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring should be generic, so mentioning 5 is not appropriate here unless you're actually setting that parameter in _get_k -- instead you might mention that this test verifies the default case. (The place to note "5" is in the docstring for _get_k -- there it is worth flagging that all the default values are tuned for k=5.)

We assert that if the algorithm matches our intuition at least 80% of the time, it's probably fine.

See function documentation for explanation of hyperparameters and their defaults.
"""

# The first value in each cases is the list of scores output by the model (one per owner)
# and the second value is the expected k
cases = [
# If smallish set of owners, return all or find obvious cutoff
([1], 1),
([1, 1], 2),
([1, 1, 1], 3),
([10, 1, 1], 3),
([1, 1, 1, 1], 4),
([10, 1, 1, 1], 4),
([10, 10, 1, 1], 4), # or 2; either seems fine
([10, 10, 1, 1], 2), # or 4; either seems fine
([1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1], 2),
([1, 1, 1, 1, 1, 1], 6),
([1, 1, 1, 1, 1, 1, 1], 7),

# If larger set of owners, return top handful or find obvious cutoff
([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5), # or 2; either seems fine
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 2), # or 5; either seems fine
([10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1], 3),
([10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1], 4),
([10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1], 6),

# Do something reasonable for non-obvious cutoffs
([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], 5),
([19, 17, 15, 13, 11, 9, 7, 5, 3, 1], 5),

# Do something reasonable for larger scales
([500, 200, 100, 50, 25, 10, 1], 3),
]
num_correct = 0
for scores, expected_k in cases:
if _get_k(scores) == expected_k:
num_correct += 1

assert (num_correct / len(cases)) >= 0.8
2 changes: 1 addition & 1 deletion Packs/CortexAttackSurfaceManagement/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "Cortex Attack Surface Management",
"description": "Content for working with Attack Surface Management (ASM).",
"support": "xsoar",
"currentVersion": "1.6.19",
"currentVersion": "1.6.20",
"author": "Cortex XSOAR",
"url": "https://www.paloaltonetworks.com/cortex",
"email": "",
Expand Down