Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASM] - Expandr 4735 #27624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Packs/CortexAttackSurfaceManagement/ReleaseNotes/1_6_21.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#### Scripts

##### RankServiceOwners

- Updated the script to return a high-confidence set of most likely owners based on their relative ranking scores.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
"""


from typing import Dict, List, Any
from typing import Any
from collections.abc import Iterable
import traceback
from itertools import groupby
import math

STRING_DELIMITER = ' | ' # delimiter used for joining Source fields and any additional fields of type string


def score(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def score(owners: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Owner score is the number of observations on that owner divided by the max number of observations
for any owner in the list
Expand All @@ -27,14 +29,18 @@ def score(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return owners


def rank(owners: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def rank(owners: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Sort owners by ranking score
Sort owners by ranking score and use data-driven algorithm to return the top k,
where k is a dynamic value based on the relative scores

See _get_k for hyperparameters that can be used to adjust the target value of k
"""
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)
k = _get_k(scores=(owner['Ranking Score'] for owner in owners))
return sorted(owners, key=lambda x: x['Ranking Score'], reverse=True)[:k]


def justify(owners: List[Dict[str, str]]) -> List[Dict[str, str]]:
def justify(owners: list[dict[str, str]]) -> list[dict[str, str]]:
"""
For now, `Justification` is the same as `Source`; in the future, will sophisticate
"""
Expand All @@ -43,7 +49,7 @@ def justify(owners: List[Dict[str, str]]) -> List[Dict[str, str]]:
return owners


def _canonicalize(owner: Dict[str, Any]) -> Dict[str, Any]:
def _canonicalize(owner: dict[str, Any]) -> dict[str, Any]:
"""
Canonicalizes an owner dictionary and adds a deduplication key
`Canonicalization` whose value is either:
Expand All @@ -62,7 +68,7 @@ def _canonicalize(owner: Dict[str, Any]) -> Dict[str, Any]:
return owner


def canonicalize(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
def canonicalize(owners: list[dict[str, str]]) -> list[dict[str, Any]]:
"""
Calls _canonicalize on each well-formatted owner; drops and logs malformated inputs
"""
Expand All @@ -78,7 +84,7 @@ def canonicalize(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return canonicalized


def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
def aggregate(owners: list[dict[str, str]]) -> list[dict[str, Any]]:
"""
Aggregate owners by their canonicalization.

Expand All @@ -101,7 +107,7 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
name = names[0] if names else ''
# aggregate Source by union
source = STRING_DELIMITER.join(sorted(
set(owner.get('source', '') for owner in duplicates if owner.get('source', ''))
{owner.get('source', '') for owner in duplicates if owner.get('source', '')}
))
# take max Timestamp if there's at least one; else empty string
timestamps = sorted(
Expand All @@ -117,8 +123,8 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
}

# aggregate remaining keys according to type
all_keys = set(k for owner in duplicates for k in owner.keys())
keys_to_types = {k: type(owner[k]) for owner in duplicates for k in owner.keys()}
all_keys = {k for owner in duplicates for k in owner}
keys_to_types = {k: type(owner[k]) for owner in duplicates for k in owner}
other_keys = []
for key in all_keys:
if key.lower() not in {'name', 'email', 'source', 'timestamp', 'canonicalization'}:
Expand All @@ -127,7 +133,7 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
if keys_to_types[other] == str:
# union over strings
owner[other] = STRING_DELIMITER.join(sorted(
set(owner.get(other, '') for owner in duplicates if owner.get(other, ''))
{owner.get(other, '') for owner in duplicates if owner.get(other, '')}
))
elif keys_to_types[other] in (int, float):
# max over numerical types
Expand All @@ -139,6 +145,63 @@ def aggregate(owners: List[Dict[str, str]]) -> List[Dict[str, Any]]:
return deduped


def _get_k(
scores: Iterable[float],
target_k: int = 5,
k_tol: int = 2,
a_tol: float = 1.0,
min_score_proportion: float = 0.75
Comment on lines +149 to +153

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you defensively verify that these values are within expected bounds? (for instance, that k_tol isn't -3.)

) -> int:
"""
Return a value of k such that:
- target_k >= k <= target_k + k_tol
- the top k scores comprise minimum specified proportion of the total score mass

See unit tests in RankServiceOwners_test.py for a more detailed specification of the
expected behavior.

Notable hyperparameters (which are tuned to target_k=5) and where they come from:

:param target_k: the value of k we are roughly targeting (set by discussion with PM)
:param k_tol: our tolerance for k, or how many additional owners above `target_k` we are willing to show
(set by intuition/discussion with PM)
:param a_tol: max expected absolute different between two scores in the same "tier"
(set by intuition; see unit tests)
:param min_score_proportion: the targeted min proportion of the score mass
(identified using a gridsearch over values to find best outcome on unit tests)
"""
if target_k < 0:
raise ValueError("target_k must be non-negative")
if k_tol < 0:
raise ValueError("k_tol must be non-negative")
if a_tol < 0:
raise ValueError("a_tol must be non-negative")
if min_score_proportion < 0 or min_score_proportion > 1:
raise ValueError("min_score_proportion must be a value between 0 and 1")

# get up to target_k scores that comprise the desired score proportion
scores_desc = sorted(scores, reverse=True)
min_score_proportion = sum(scores_desc) * min_score_proportion
k = 0
cumulative_score = 0.0
while cumulative_score < min_score_proportion and k < target_k:
cumulative_score += scores_desc[k]
k += 1

# score values are likely groupable into "tiers"; try to find a cutoff between tiers
# look for the end of the next element's tier (may be the current or next tier),
# where a tier is (arbitrarily) defined by an absolute difference of `a_tol`
tier_index = k
while tier_index < len(scores_desc) and math.isclose(scores_desc[tier_index], scores_desc[tier_index - 1], abs_tol=a_tol):
tier_index += 1

# add additional score(s) if within tolerance for k
if math.isclose(target_k, tier_index, abs_tol=k_tol):
k = tier_index

return k


def main():
try:
unranked = demisto.args().get("owners", [])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import demistomock as demisto # noqa: F401
import pytest
import unittest
from RankServiceOwners import score, main, rank, _canonicalize, aggregate
from RankServiceOwners import score, main, rank, _canonicalize, aggregate, _get_k
from contextlib import nullcontext as does_not_raise


@pytest.mark.parametrize('owners,expected_out', [
(
# returned in sorted order
[
{
'Name': 'bob', 'Email': 'bob@example.com', 'Source': '',
Expand All @@ -27,6 +29,61 @@
},
]
),
(
# wraps one test case from _get_k
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
],
[
{
'Name': 'a', 'Email': 'a@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 10, 'Justification': ''
},
{
'Name': 'b', 'Email': 'b@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'c', 'Email': 'c@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'd', 'Email': 'd@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'e', 'Email': 'e@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
{
'Name': 'f', 'Email': 'f@example.com', 'Source': '',
'Timestamp': '', 'Ranking Score': 1, 'Justification': ''
},
]
),
])
def test_rank(owners, expected_out):
assert rank(owners) == expected_out
Expand Down Expand Up @@ -395,3 +452,76 @@ def test_main(mocker, owners, expected_out, capfd):
# Verify the output value was set
expected_calls_to_mock_object = [unittest.mock.call('setAlert', {'asmserviceowner': expected_out})]
assert demisto_execution_mock.call_args_list == expected_calls_to_mock_object


def test_get_k():
"""
These cases are designed to specify the intuition we are trying to implement with the algorithm
and verify its default hyperparameters.
We assert that if the algorithm matches our intuition at least 80% of the time, it's probably fine.

See function documentation for explanation of hyperparameters and their defaults.
"""

# The first value in each cases is the list of scores output by the model (one per owner)
# and the second value is the expected k
cases = [
# If smallish set of owners, return all or find obvious cutoff
([1], 1),
([1, 1], 2),
([1, 1, 1], 3),
([10, 1, 1], 3),
([1, 1, 1, 1], 4),
([10, 1, 1, 1], 4),
([10, 10, 1, 1], 4), # or 2; either seems fine
([10, 10, 1, 1], 2), # or 4; either seems fine
([1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1], 2),
([1, 1, 1, 1, 1, 1], 6),
([1, 1, 1, 1, 1, 1, 1], 7),

# If larger set of owners, return top handful or find obvious cutoff
([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5),
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 5), # or 2; either seems fine
([10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 2), # or 5; either seems fine
([10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1, 1], 3),
([10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1, 1], 4),
([10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1, 1], 5),
([100, 10, 10, 10, 10, 10, 1, 1, 1, 1, 1, 1], 6),

# Do something reasonable for non-obvious cutoffs
([10, 9, 8, 7, 6, 5, 4, 3, 2, 1], 5),
([19, 17, 15, 13, 11, 9, 7, 5, 3, 1], 5),

# Do something reasonable for larger scales
([500, 200, 100, 50, 25, 10, 1], 3),
]
num_correct = 0
for scores, expected_k in cases:
if _get_k(scores) == expected_k:
num_correct += 1

assert (num_correct / len(cases)) >= 0.8


@pytest.mark.parametrize('target_k, k_tol, a_tol, min_score_proportion, expected_raises', [
(-1, 2, 1.0, 0.75, pytest.raises(ValueError, match="target_k must be non-negative")),
(5, -1, 1.0, 0.75, pytest.raises(ValueError, match="k_tol must be non-negative")),
(5, 2, -1, 0.75, pytest.raises(ValueError, match="a_tol must be non-negative")),
(5, 2, 1.0, -1, pytest.raises(ValueError, match="min_score_proportion must be a value between 0 and 1")),
(5, 2, 1.0, 1.1, pytest.raises(ValueError, match="min_score_proportion must be a value between 0 and 1")),
(5, 2, 1.0, 0.75, does_not_raise()),
])
def test_get_k_bad_values(target_k, k_tol, a_tol, min_score_proportion, expected_raises):
scores = [1, 1, 1]
with expected_raises:
assert _get_k(
scores,
target_k=target_k,
k_tol=k_tol,
a_tol=a_tol,
min_score_proportion=min_score_proportion,
)
2 changes: 1 addition & 1 deletion Packs/CortexAttackSurfaceManagement/pack_metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "Cortex Attack Surface Management",
"description": "Content for working with Attack Surface Management (ASM).",
"support": "xsoar",
"currentVersion": "1.6.20",
"currentVersion": "1.6.21",
"author": "Cortex XSOAR",
"url": "https://www.paloaltonetworks.com/cortex",
"email": "",
Expand Down