Skip to content

Commit

Permalink
[ASM] - Expandr 4735 (#27624) (#27951)
Browse files Browse the repository at this point in the history
* Update ranking algorithm for Service Ownership

Currently, we score and return all owners in ${alert.asmserviceowner} in sorted owners; instead, we want ${alert.asmserviceowner} to contain a smaller, high-confidence set of owners that we would be comfortable notifying via email.

Test plan: pytest + manual testing in tenant

* Add release notes

* Verify hyperparameters and update docs

* Add unit test for value-checking in _get_k

* Update release notes

* Manually apply diff in output of pre-commit check: use built-ins for type hints, set generators

---------

Co-authored-by: kball-pa <131012047+kball-pa@users.noreply.github.com>
Co-authored-by: michal-dagan <109464765+michal-dagan@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 6, 2023
1 parent ca47686 commit d0265a4
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 15 deletions.
5 changes: 5 additions & 0 deletions Packs/CortexAttackSurfaceManagement/ReleaseNotes/1_6_21.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#### Scripts

##### RankServiceOwners

- Updated the script to return a high-confidence set of most likely owners based on their relative ranking scores.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
"""


from typing import Dict, List, Any
from typing import Any
from collections.abc import Iterable
import traceback
from itertools import groupby
import math

STRING_DELIMITER = ' | ' # delimiter used for joining Source fields and any additional fields of type string


def score(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def score(owners: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Owner score is the number of observations on that owner divided by the max number of observations
for any owner in the list
Expand All @@ -27,14 +29,18 @@ def score(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return owners


def rank(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def rank(owners: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Sort owners by ranking score
Sort owners by ranking score and use data-driven algorithm to return the top k,
where k is a dynamic value based on the relative scores
See _get_k for hyperparameters that can be used to adjust the target value of k
"""
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)
k = _get_k(scores=(owner['Ranking Score'] for owner in owners))
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)[:k]


def justify(owners: List[Dict[str, str]]) -> List[Dict[str, str]]:
def justify(owners: list[dict[str, str]]) -> list[dict[str, str]]:
"""
For now, `Justification` is the same as `Source`; in the future, will sophisticate
"""
Expand All @@ -43,7 +49,7 @@ def justify(owners: List[Dict[str, str]]) -> List[Dict[str, str]]:
return owners


def _canonicalize(owner: Dict[str, Any]) -> Dict[str, Any]:
def _canonicalize(owner: dict[str, Any]) -> dict[str, Any]:
"""
Canonicalizes an owner dictionary and adds a deduplication key
`Canonicalization` whose value is either:
Expand All @@ -62,7 +68,7 @@ def _canonicalize(owner: Dict[str, Any]) -> Dict[str, Any]:
return owner


def canonicalize(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
def canonicalize(owners: list[dict[str, str]]) -> list[dict[str, Any]]:
"""
Calls _canonicalize on each well-formatted owner; drops and logs malformated inputs
"""
Expand All @@ -78,7 +84,7 @@ def canonicalize(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return canonicalized


def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
def aggregate(owners: list[dict[str, str]]) -> list[dict[str, Any]]:
"""
Aggregate owners by their canonicalization.
Expand All @@ -101,7 +107,7 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
name = names[0] if names else ''
# aggregate Source by union
source = STRING_DELIMITER.join(sorted(
set(owner.get('source', '') for owner in duplicates if owner.get('source', ''))
{owner.get('source', '') for owner in duplicates if owner.get('source', '')}
))
# take max Timestamp if there's at least one; else empty string
timestamps = sorted(
Expand All @@ -117,8 +123,8 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
}

# aggregate remaining keys according to type
all_keys = set(k for owner in duplicates for k in owner.keys())
keys_to_types = {k: type(owner[k]) for owner in duplicates for k in owner.keys()}
all_keys = {k for owner in duplicates for k in owner}
keys_to_types = {k: type(owner[k]) for owner in duplicates for k in owner}
other_keys = []
for key in all_keys:
if key.lower() not in {'name', 'email', 'source', 'timestamp', 'canonicalization'}:
Expand All @@ -127,7 +133,7 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
if keys_to_types[other] == str:
# union over strings
owner[other] = STRING_DELIMITER.join(sorted(
set(owner.get(other, '') for owner in duplicates if owner.get(other, ''))
{owner.get(other, '') for owner in duplicates if owner.get(other, '')}
))
elif keys_to_types[other] in (int, float):
# max over numerical types
Expand All @@ -139,6 +145,63 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return deduped


def _get_k(
scores: Iterable[float],
target_k: int = 5,
k_tol: int = 2,
a_tol: float = 1.0,
min_score_proportion: float = 0.75
) -> int:
"""
Return a value of k such that:
- target_k >= k <= target_k + k_tol
- the top k scores comprise minimum specified proportion of the total score mass
See unit tests in RankServiceOwners_test.py for a more detailed specification of the
expected behavior.
Notable hyperparameters (which are tuned to target_k=5) and where they come from:
:param target_k: the value of k we are roughly targeting (set by discussion with PM)
:param k_tol: our tolerance for k, or how many additional owners above `target_k` we are willing to show
(set by intuition/discussion with PM)
:param a_tol: max expected absolute different between two scores in the same "tier"
(set by intuition; see unit tests)
:param min_score_proportion: the targeted min proportion of the score mass
(identified using a gridsearch over values to find best outcome on unit tests)
"""
if target_k < 0:
raise ValueError("target_k must be non-negative")
if k_tol < 0:
raise ValueError("k_tol must be non-negative")
if a_tol < 0:
raise ValueError("a_tol must be non-negative")
if min_score_proportion < 0 or min_score_proportion > 1:
raise ValueError("min_score_proportion must be a value between 0 and 1")

# get up to target_k scores that comprise the desired score proportion
scores_desc = sorted(scores, reverse=True)
min_score_proportion = sum(scores_desc) * min_score_proportion
k = 0
cumulative_score = 0.0
while cumulative_score < min_score_proportion and k < target_k:
cumulative_score += scores_desc[k]
k += 1

# score values are likely groupable into "tiers"; try to find a cutoff between tiers
# look for the end of the next element's tier (may be the current or next tier),
# where a tier is (arbitrarily) defined by an absolute difference of `a_tol`
tier_index = k
while tier_index < len(scores_desc) and math.isclose(scores_desc[tier_index], scores_desc[tier_index - 1], abs_tol=a_tol):
tier_index += 1

# add additional score(s) if within tolerance for k
if math.isclose(target_k, tier_index, abs_tol=k_tol):
k = tier_index

return k


def main():
try:
unranked = demisto.args().get("owners", [])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import demistomock as demisto # noqa: F401
import pytest
import unittest
from RankServiceOwners import score, main, rank, _canonicalize, aggregate
from RankServiceOwners import score, main, rank, _canonicalize, aggregate, _get_k
from contextlib import nullcontext as does_not_raise


@pytest.mark.parametrize('owners,expected_out', [
(
# returned in sorted order
[
{
'Name': 'bob', 'Email': 'bob@example.com', 'Source': '',
Expand All @@ -27,6 +29,61 @@
},
]
),
(
# wraps one test case from _get_k
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
],
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
]
),
])
def test_rank(owners, expected_out):
assert rank(owners) == expected_out
Expand Down Expand Up @@ -395,3 +452,76 @@ def test_main(mocker, owners, expected_out, capfd):
# Verify the output value was set
expected_calls_to_mock_object = [unittest.mock.call('setAlert', {'asmserviceowner': expected_out})]
assert demisto_execution_mock.call_args_list == expected_calls_to_mock_object


def test_get_k():
"""
These cases are designed to specify the intuition we are trying to implement with the algorithm
and verify its default hyperparameters.
We assert that if the algorithm matches our intuition at least 80% of the time, it's probably fine.
See function documentation for explanation of hyperparameters and their defaults.
"""

# The first value in each cases is the list of scores output by the model (one per owner)
# and the second value is the expected k
cases = [
# If smallish set of owners, return all or find obvious cutoff
([1], 1),
([1, 1], 2),
([1, 1, 1], 3),
([10, 1, 1], 3),
([1, 1, 1, 1], 4),
([10, 1, 1, 1], 4),
([10, 10, 1, 1], 4), # or 2; either seems fine
([10, 10, 1, 1], 2), # or 4; either seems fine
([1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1], 2),
([1, 1, 1, 1, 1, 1], 6),
([1, 1, 1, 1, 1, 1, 1], 7),

# If larger set of owners, return top handful or find obvious cutoff
([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5), # or 2; either seems fine
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 2), # or 5; either seems fine
([10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1], 3),
([10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1], 4),
([10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1], 6),

# Do something reasonable for non-obvious cutoffs
([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], 5),
([19, 17, 15, 13, 11, 9, 7, 5, 3, 1], 5),

# Do something reasonable for larger scales
([500, 200, 100, 50, 25, 10, 1], 3),
]
num_correct = 0
for scores, expected_k in cases:
if _get_k(scores) == expected_k:
num_correct += 1

assert (num_correct / len(cases)) >= 0.8


@pytest.mark.parametrize('target_k, k_tol, a_tol, min_score_proportion, expected_raises', [
(-1, 2, 1.0, 0.75, pytest.raises(ValueError, match="target_k must be non-negative")),
(5, -1, 1.0, 0.75, pytest.raises(ValueError, match="k_tol must be non-negative")),
(5, 2, -1, 0.75, pytest.raises(ValueError, match="a_tol must be non-negative")),
(5, 2, 1.0, -1, pytest.raises(ValueError, match="min_score_proportion must be a value between 0 and 1")),
(5, 2, 1.0, 1.1, pytest.raises(ValueError, match="min_score_proportion must be a value between 0 and 1")),
(5, 2, 1.0, 0.75, does_not_raise()),
])
def test_get_k_bad_values(target_k, k_tol, a_tol, min_score_proportion, expected_raises):
scores = [1, 1, 1]
with expected_raises:
assert _get_k(
scores,
target_k=target_k,
k_tol=k_tol,
a_tol=a_tol,
min_score_proportion=min_score_proportion,
)
2 changes: 1 addition & 1 deletion Packs/CortexAttackSurfaceManagement/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "Cortex Attack Surface Management",
"description": "Content for working with Attack Surface Management (ASM).",
"support": "xsoar",
"currentVersion": "1.6.20",
"currentVersion": "1.6.21",
"author": "Cortex XSOAR",
"url": "https://www.paloaltonetworks.com/cortex",
"email": "",
Expand Down

0 comments on commit d0265a4

Please sign in to comment.