Skip to content

Test outputs #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 114 additions & 54 deletions Packs/Code42/Integrations/Code42/Code42_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,8 @@
"status": "OPEN",
"cloudUsernames": ["test@example.com"],
"totalBytes": 139856482,
"numEvents": 11
"numEvents": 11,
"departureDate": "2020-07-20"
},
{
"type$": "DEPARTING_EMPLOYEE_V2",
Expand Down Expand Up @@ -987,6 +988,10 @@
"""


_TEST_USER_ID = "123412341234123412" # value found in GET_USER_RESPONSE
_TEST_USERNAME = "user1@example.com"


@pytest.fixture
def code42_sdk_mock(mocker):
code42_mock = mocker.MagicMock(spec=SDKClient)
Expand Down Expand Up @@ -1071,6 +1076,20 @@ def get_empty_detectionlist_response(mocker, base_text):
return create_mock_code42_sdk_response_generator(mocker, [no_employees_response_text])


def assert_departingemployee_outputs_match_response(outputs_list, response_items):
assert_detection_list_outputs_match_response_items(outputs_list, response_items)
for i in range(0, len(outputs_list)):
assert outputs_list[i]["DepartureDate"] == response_items[i].get("departureDate")


def assert_detection_list_outputs_match_response_items(outputs_list, response_items):
assert len(outputs_list) == len(response_items)
for i in range(0, len(outputs_list)):
assert outputs_list[i]["Username"] == response_items[i]["userName"]
assert outputs_list[i]["UserID"] == response_items[i]["userId"]
assert outputs_list[i]["Note"] == response_items[i]["notes"]


"""TESTS"""


Expand Down Expand Up @@ -1148,45 +1167,52 @@ def test_map_to_file_context():

def test_alert_get_command(code42_alerts_mock):
client = create_client(code42_alerts_mock)
_, _, res = alert_get_command(client, {"id": "36fb8ca5-0533-4d25-9763-e09d35d60610"})
_, outputs, res = alert_get_command(client, {"id": "36fb8ca5-0533-4d25-9763-e09d35d60610"})
assert res["ruleId"] == "4576576e-13cb-4f88-be3a-ee77739de649"
assert outputs == {"Code42.SecurityAlert": [MOCK_CODE42_ALERT_CONTEXT[0]]}


def test_alert_resolve_command(code42_alerts_mock):
client = create_client(code42_alerts_mock)
_, _, res = alert_resolve_command(client, {"id": "36fb8ca5-0533-4d25-9763-e09d35d60610"})
_, outputs, res = alert_resolve_command(client, {"id": "36fb8ca5-0533-4d25-9763-e09d35d60610"})
assert res["id"] == "36fb8ca5-0533-4d25-9763-e09d35d60610"
assert outputs == {"Code42.SecurityAlert": [MOCK_CODE42_ALERT_CONTEXT[0]]}


def test_departingemployee_add_command(code42_sdk_mock):
client = create_client(code42_sdk_mock)
_, _, res = departingemployee_add_command(
date = "2020-01-01"
note = "Dummy note"
_, outputs, res = departingemployee_add_command(
client,
{"username": "user1@example.com", "departuredate": "2020-01-01", "note": "Dummy note"},
{"username": _TEST_USERNAME, "departuredate": date, "note": note},
)
expected_user_id = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected_user_id
add_func = code42_sdk_mock.detectionlists.departing_employee.add
add_func.assert_called_once_with(expected_user_id, departure_date="2020-01-01")
code42_sdk_mock.detectionlists.update_user_notes.assert_called_once_with(
expected_user_id, "Dummy note"
)
assert res == _TEST_USER_ID
assert outputs["Code42.DepartingEmployee"]["DepartureDate"] == date
assert outputs["Code42.DepartingEmployee"]["Note"] == note
assert outputs["Code42.DepartingEmployee"]["Username"] == _TEST_USERNAME
assert outputs["Code42.DepartingEmployee"]["UserID"] == _TEST_USER_ID
add_func.assert_called_once_with(_TEST_USER_ID, departure_date=date)
code42_sdk_mock.detectionlists.update_user_notes.assert_called_once_with(_TEST_USER_ID, note)


def test_departingemployee_remove_command(code42_sdk_mock):
client = create_client(code42_sdk_mock)
_, _, res = departingemployee_remove_command(client, {"username": "user1@example.com"})
expected = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected
code42_sdk_mock.detectionlists.departing_employee.remove.assert_called_once_with(expected)
_, outputs, res = departingemployee_remove_command(client, {"username": _TEST_USERNAME})
assert res == _TEST_USER_ID
code42_sdk_mock.detectionlists.departing_employee.remove.assert_called_once_with(_TEST_USER_ID)
assert outputs["Code42.DepartingEmployee"]["Username"] == _TEST_USERNAME
assert outputs["Code42.DepartingEmployee"]["UserID"] == _TEST_USER_ID


def test_departingemployee_get_all_command(code42_departing_employee_mock):
client = create_client(code42_departing_employee_mock)
_, _, res = departingemployee_get_all_command(client, {})
_, outputs, res = departingemployee_get_all_command(client, {})
outputs_list = outputs["Code42.DepartingEmployee(val.UserID && val.UserID == obj.UserID)"]
expected = json.loads(MOCK_GET_ALL_DEPARTING_EMPLOYEES_RESPONSE)["items"]
assert res == expected
assert code42_departing_employee_mock.detectionlists.departing_employee.get_all.call_count == 1
assert_departingemployee_outputs_match_response(outputs_list, expected)


def test_departingemployee_get_all_command_gets_employees_from_multiple_pages(
Expand All @@ -1202,13 +1228,14 @@ def test_departingemployee_get_all_command_gets_employees_from_multiple_pages(
employee_page_generator
)
client = create_client(code42_departing_employee_mock)

_, _, res = departingemployee_get_all_command(client, {})
_, outputs, res = departingemployee_get_all_command(client, {})
outputs_list = outputs["Code42.DepartingEmployee(val.UserID && val.UserID == obj.UserID)"]

# Expect to have employees from 3 pages in the result
expected_page = json.loads(MOCK_GET_ALL_DEPARTING_EMPLOYEES_RESPONSE)["items"]
expected = expected_page + expected_page + expected_page
assert res == expected
assert_departingemployee_outputs_match_response(outputs_list, res)


def test_departingemployee_get_all_command_gets_number_of_employees_equal_to_results_param(
Expand Down Expand Up @@ -1240,7 +1267,7 @@ def test_departingemployee_get_all_command_when_no_employees(
no_employees_response
)
client = create_client(code42_departing_employee_mock)
_, _, res = departingemployee_get_all_command(
_, outputs, res = departingemployee_get_all_command(
client,
{
"risktags": [
Expand All @@ -1250,41 +1277,48 @@ def test_departingemployee_get_all_command_when_no_employees(
]
},
)
outputs_list = outputs["Code42.DepartingEmployee(val.UserID && val.UserID == obj.UserID)"]

# Only first employee has the given risk tags
expected = []
assert res == expected
assert code42_departing_employee_mock.detectionlists.departing_employee.get_all.call_count == 1
assert_departingemployee_outputs_match_response(outputs_list, res)


def test_highriskemployee_add_command(code42_high_risk_employee_mock):
client = create_client(code42_high_risk_employee_mock)
_, _, res = highriskemployee_add_command(
client, {"username": "user1@example.com", "note": "Dummy note"}
_, outputs, res = highriskemployee_add_command(
client, {"username": _TEST_USERNAME, "note": "Dummy note"}
)
expected_user_id = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected_user_id
assert res == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["UserID"] == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["Username"] == _TEST_USERNAME
code42_high_risk_employee_mock.detectionlists.high_risk_employee.add.assert_called_once_with(
expected_user_id
_TEST_USER_ID
)
code42_high_risk_employee_mock.detectionlists.update_user_notes.assert_called_once_with(
expected_user_id, "Dummy note"
_TEST_USER_ID, "Dummy note"
)


def test_highriskemployee_remove_command(code42_sdk_mock):
client = create_client(code42_sdk_mock)
_, _, res = highriskemployee_remove_command(client, {"username": "user1@example.com"})
expected = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected
code42_sdk_mock.detectionlists.high_risk_employee.remove.assert_called_once_with(expected)
_, outputs, res = highriskemployee_remove_command(client, {"username": _TEST_USERNAME})
assert res == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["UserID"] == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["Username"] == _TEST_USERNAME
code42_sdk_mock.detectionlists.high_risk_employee.remove.assert_called_once_with(_TEST_USER_ID)


def test_highriskemployee_get_all_command(code42_high_risk_employee_mock):
client = create_client(code42_high_risk_employee_mock)
_, _, res = highriskemployee_get_all_command(client, {})
_, outputs, res = highriskemployee_get_all_command(client, {})
outputs_list = outputs["Code42.HighRiskEmployee(val.UserID && val.UserID == obj.UserID)"]
expected = json.loads(MOCK_GET_ALL_HIGH_RISK_EMPLOYEES_RESPONSE)["items"]
assert res == expected
assert code42_high_risk_employee_mock.detectionlists.high_risk_employee.get_all.call_count == 1
assert_detection_list_outputs_match_response_items(outputs_list, expected)


def test_highriskemployee_get_all_command_gets_employees_from_multiple_pages(
Expand All @@ -1301,26 +1335,30 @@ def test_highriskemployee_get_all_command_gets_employees_from_multiple_pages(
)
client = create_client(code42_high_risk_employee_mock)

_, _, res = highriskemployee_get_all_command(client, {"username": "user1@example.com"})
_, outputs, res = highriskemployee_get_all_command(client, {"username": _TEST_USERNAME})
outputs_list = outputs["Code42.HighRiskEmployee(val.UserID && val.UserID == obj.UserID)"]

# Expect to have employees from 3 pages in the result
expected_page = json.loads(MOCK_GET_ALL_HIGH_RISK_EMPLOYEES_RESPONSE)["items"]
expected = expected_page + expected_page + expected_page
assert res == expected
assert_detection_list_outputs_match_response_items(outputs_list, expected)


def test_highriskemployee_get_all_command_when_given_risk_tags_only_gets_employees_with_tags(
code42_high_risk_employee_mock
):
client = create_client(code42_high_risk_employee_mock)
_, _, res = highriskemployee_get_all_command(
_, outputs, res = highriskemployee_get_all_command(
client,
{"risktags": "PERFORMANCE_CONCERNS SUSPICIOUS_SYSTEM_ACTIVITY POOR_SECURITY_PRACTICES"},
)
outputs_list = outputs["Code42.HighRiskEmployee(val.UserID && val.UserID == obj.UserID)"]
# Only first employee has the given risk tags
expected = [json.loads(MOCK_GET_ALL_HIGH_RISK_EMPLOYEES_RESPONSE)["items"][0]]
assert res == expected
assert code42_high_risk_employee_mock.detectionlists.high_risk_employee.get_all.call_count == 1
assert_detection_list_outputs_match_response_items(outputs_list, expected)


def test_highriskemployee_get_all_command_gets_number_of_employees_equal_to_results_param(
Expand Down Expand Up @@ -1348,56 +1386,78 @@ def test_highriskemployee_get_all_command_when_no_employees(code42_high_risk_emp
no_employees_response
)
client = create_client(code42_high_risk_employee_mock)
_, _, res = highriskemployee_get_all_command(
_, outputs, res = highriskemployee_get_all_command(
client,
{
"risktags": "PERFORMANCE_CONCERNS SUSPICIOUS_SYSTEM_ACTIVITY POOR_SECURITY_PRACTICES"
},
)
outputs_list = outputs["Code42.HighRiskEmployee(val.UserID && val.UserID == obj.UserID)"]
# Only first employee has the given risk tags
expected = []
assert res == expected
assert code42_high_risk_employee_mock.detectionlists.high_risk_employee.get_all.call_count == 1
assert_detection_list_outputs_match_response_items(outputs_list, expected)


def test_highriskemployee_add_risk_tags_command(code42_sdk_mock):
tags = "FLIGHT_RISK"
client = create_client(code42_sdk_mock)
_, _, res = highriskemployee_add_risk_tags_command(
client, {"username": "user1@example.com", "risktags": "FLIGHT_RISK"}
_, outputs, res = highriskemployee_add_risk_tags_command(
client, {"username": _TEST_USERNAME, "risktags": "FLIGHT_RISK"}
)
expected_user_id = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected_user_id
assert res == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["UserID"] == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["Username"] == _TEST_USERNAME
assert outputs["Code42.HighRiskEmployee"]["RiskTags"] == tags
code42_sdk_mock.detectionlists.add_user_risk_tags.assert_called_once_with(
expected_user_id, ["FLIGHT_RISK"]
_TEST_USER_ID, ["FLIGHT_RISK"]
)


def test_highriskemployee_remove_risk_tags_command(code42_sdk_mock):
client = create_client(code42_sdk_mock)
_, _, res = highriskemployee_remove_risk_tags_command(
client, {"username": "user1@example.com", "risktags": "FLIGHT_RISK CONTRACT_EMPLOYEE"}
_, outputs, res = highriskemployee_remove_risk_tags_command(
client, {"username": _TEST_USERNAME, "risktags": "FLIGHT_RISK CONTRACT_EMPLOYEE"}
)
expected_user_id = "123412341234123412" # value found in GET_USER_RESPONSE
assert res == expected_user_id
assert res == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["UserID"] == _TEST_USER_ID
assert outputs["Code42.HighRiskEmployee"]["Username"] == _TEST_USERNAME
assert outputs["Code42.HighRiskEmployee"]["RiskTags"] == "FLIGHT_RISK CONTRACT_EMPLOYEE"
code42_sdk_mock.detectionlists.remove_user_risk_tags.assert_called_once_with(
expected_user_id, ["FLIGHT_RISK", "CONTRACT_EMPLOYEE"]
_TEST_USER_ID, ["FLIGHT_RISK", "CONTRACT_EMPLOYEE"]
)


def test_security_data_search_command(code42_file_events_mock):
client = create_client(code42_file_events_mock)
_, _, res = securitydata_search_command(client, MOCK_SECURITY_DATA_SEARCH_QUERY)
assert len(res) == 3
_, outputs, res = securitydata_search_command(client, MOCK_SECURITY_DATA_SEARCH_QUERY)
outputs_list = outputs["Code42.SecurityData(val.EventID && val.EventID == obj.EventID)"]
actual_query = code42_file_events_mock.securitydata.search_file_events.call_args[0][0]
filter_groups = json.loads(str(actual_query))["groups"]
assert filter_groups[0]["filters"][0]["term"] == "md5Checksum"
assert filter_groups[0]["filters"][0]["value"] == "d41d8cd98f00b204e9800998ecf8427e"
assert filter_groups[1]["filters"][0]["term"] == "osHostName"
assert filter_groups[1]["filters"][0]["value"] == "DESKTOP-0001"
assert filter_groups[2]["filters"][0]["term"] == "deviceUserName"
assert filter_groups[2]["filters"][0]["value"] == "user3@example.com"
assert filter_groups[3]["filters"][0]["term"] == "exposure"
assert filter_groups[3]["filters"][0]["value"] == "ApplicationRead"
expected_query_items = [
("md5Checksum", "d41d8cd98f00b204e9800998ecf8427e"),
("osHostName", "DESKTOP-0001"),
("deviceUserName", "user3@example.com"),
("exposure", "ApplicationRead")
]
expected_file_events = json.loads(MOCK_SECURITY_EVENT_RESPONSE)["fileEvents"]

# Assert that the correct query gets made
assert len(filter_groups) == len(expected_query_items)
for i in range(0, len(filter_groups)):
_filter = filter_groups[i]["filters"][0]
assert _filter["term"] == expected_query_items[i][0]
assert _filter["value"] == expected_query_items[i][1]

assert len(res) == len(outputs_list) == 3
assert res == expected_file_events

# Assert that the Outputs are mapped from the file events.
for i in range(0, len(expected_file_events)):
mapped_event = map_to_code42_event_context(expected_file_events[i])
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is a violation of good unit testing practices since it is calling this mapping method in the expected result --- but file events are very large.....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method also tested independently? If that method doesn't work but this method passes, that could lead to a hard-to-diagnose problem.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we have a different test that proves that it works, its probably okay to use here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that method is tested

output_item = outputs_list[i]
assert output_item == mapped_event


def test_fetch_when_no_significant_file_categories_ignores_filter(
Expand Down