diff --git a/simulation-system/libs/csle-tolerance/recovery_game.txt b/simulation-system/libs/csle-tolerance/recovery_game.txt new file mode 100644 index 000000000..da2443288 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/recovery_game.txt @@ -0,0 +1,99 @@ +3 1 2 2 2 72 12 0.9 +0 0 +1 0 +2 0 +WAIT +RECOVER +FALSEALARM +ATTACK +o_0 +o_1 +0 1 +0 1 +0 1 +0 1 +0 0 0 0 0 0.06 +0 0 0 1 0 0.04000000000000001 +0 0 0 0 1 0.12 +0 0 0 1 1 0.08000000000000002 +0 0 0 0 2 0.42 +0 0 0 1 2 0.27999999999999997 +0 0 1 0 0 0.1 +0 0 1 1 0 0.1 +0 0 1 0 1 0.15 +0 0 1 1 1 0.15 +0 0 1 0 2 0.25 +0 0 1 1 2 0.25 +0 1 0 0 0 0.3 +0 1 0 1 0 0.2 +0 1 0 0 1 0.18 +0 1 0 1 1 0.12 +0 1 0 0 2 0.12 +0 1 0 1 2 0.08000000000000002 +0 1 1 0 0 0.15 +0 1 1 1 0 0.15 +0 1 1 0 1 0.2 +0 1 1 1 1 0.2 +0 1 1 0 2 0.15 +0 1 1 1 2 0.15 +1 0 0 0 0 0.18 +1 0 0 1 0 0.12 +1 0 0 0 1 0.24 +1 0 0 1 1 0.16000000000000003 +1 0 0 0 2 0.18 +1 0 0 1 2 0.12 +1 0 1 0 0 0.2 +1 0 1 1 0 0.2 +1 0 1 0 1 0.15 +1 0 1 1 1 0.15 +1 0 1 0 2 0.15 +1 0 1 1 2 0.15 +1 1 0 0 0 0.18 +1 1 0 1 0 0.12 +1 1 0 0 1 0.3 +1 1 0 1 1 0.2 +1 1 0 0 2 0.12 +1 1 0 1 2 0.08000000000000002 +1 1 1 0 0 0.25 +1 1 1 1 0 0.25 +1 1 1 0 1 0.2 +1 1 1 1 1 0.2 +1 1 1 0 2 0.05 +1 1 1 1 2 0.05 +2 0 0 0 0 0.12 +2 0 0 1 0 0.08000000000000002 +2 0 0 0 1 0.12 +2 0 0 1 1 0.08000000000000002 +2 0 0 0 2 0.36 +2 0 0 1 2 0.24 +2 0 1 0 0 0.15 +2 0 1 1 0 0.15 +2 0 1 0 1 0.15 +2 0 1 1 1 0.15 +2 0 1 0 2 0.2 +2 0 1 1 2 0.2 +2 1 0 0 0 0.24 +2 1 0 1 0 0.16000000000000003 +2 1 0 0 1 0.18 +2 1 0 1 1 0.12 +2 1 0 0 2 0.18 +2 1 0 1 2 0.12 +2 1 1 0 0 0.1 +2 1 1 1 0 0.1 +2 1 1 0 1 0.25 +2 1 1 1 1 0.25 +2 1 1 0 2 0.15 +2 1 1 1 2 0.15 +0 0 0 -1.0 +0 0 1 -1.0 +0 1 0 -4.0 +0 1 1 -4.0 +1 0 0 -2.0 +1 0 1 -2.0 +1 1 0 -5.0 +1 1 1 -5.0 +2 0 0 -3.0 +2 0 1 -3.0 +2 1 0 -6.0 +2 1 1 -6.0 +0 0.1 0.9 \ No newline at end of file diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py index 5bb8197bb..ccb05f4a7 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py @@ -522,7 +522,6 @@ def generate_transitions(game_config: IntrusionRecoveryGameConfig) -> List[str]: if prob > 0: transition = f"{s} {a1} {a2} {i} {s_prime} {prob}" transitions.append(transition) - return transitions @staticmethod diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py index f7fef9f78..cdafcb510 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py @@ -3,7 +3,6 @@ ) from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil import pytest_mock -import numpy as np class TestIntrusionRecoveryGameConfigSuite: @@ -28,7 +27,7 @@ def test__init__(self) -> None: observations=[0, 1], cost_tensor=[[1, 2], [3, 4]], observation_tensor=[[1, 2], [3, 4]], - transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + transition_tensor=[[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], b1=[0.1, 0.9], T=100, simulation_env_name="sim_env", @@ -48,8 +47,7 @@ def test__init__(self) -> None: assert dto.cost_tensor == [[1, 2], [3, 4]] assert dto.observation_tensor == [[1, 2], [3, 4]] assert dto.transition_tensor == [ - [[0.1, 0.2], [0.3, 0.4]], - [[0.5, 0.6], [0.7, 0.8]], + [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] ] assert dto.b1 == [0.1, 0.9] assert dto.T == 100 @@ -74,7 +72,7 @@ def test__str__(self) -> None: observations=[0, 1], cost_tensor=[[1, 2], [3, 4]], observation_tensor=[[1, 2], [3, 4]], - transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + transition_tensor=[[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], b1=[0.1, 0.9], T=100, simulation_env_name="sim_env", @@ -85,7 +83,7 @@ def test__str__(self) -> None: "eta: 0.5, p_a: 0.8, p_c_1: 0.1, BTR: 10, negate_costs: True, seed: 123, " "discount_factor: 0.9, states: [0, 1, 2], actions: [0, 1], observations: [[1, 2], [3, 4]], " "cost_tensor: [[1, 2], [3, 4]], observation_tensor: [[1, 2], [3, 4]], " - "transition_tensor: [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], b1:[0.1, 0.9], " + "transition_tensor: [[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], b1:[0.1, 0.9], " "T: 100, simulation_env_name: sim_env, gym_env_name: gym_env, max_horizon: 1000" ) assert dto.__str__() == expected @@ -107,7 +105,7 @@ def test_from_dict(self) -> None: "observations": [0, 1], "cost_tensor": [[1, 2], [3, 4]], "observation_tensor": [[1, 2], [3, 4]], - "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "transition_tensor": [[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], "b1": [0.1, 0.9], "T": 100, "simulation_env_name": "sim_env", @@ -127,8 +125,7 @@ def test_from_dict(self) -> None: assert dto.cost_tensor == [[1, 2], [3, 4]] assert dto.observation_tensor == [[1, 2], [3, 4]] assert dto.transition_tensor == [ - [[0.1, 0.2], [0.3, 0.4]], - [[0.5, 0.6], [0.7, 0.8]], + [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] ] assert dto.b1 == [0.1, 0.9] assert dto.T == 100 @@ -152,7 +149,7 @@ def test_to_dict(self) -> None: observations=[0, 1], cost_tensor=[[1, 2], [3, 4]], observation_tensor=[[1, 2], [3, 4]], - transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + transition_tensor=[[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], b1=[0.1, 0.9], T=100, simulation_env_name="sim_env", @@ -171,7 +168,7 @@ def test_to_dict(self) -> None: "observations": [0, 1], "cost_tensor": [[1, 2], [3, 4]], "observation_tensor": [[1, 2], [3, 4]], - "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "transition_tensor": [[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]]], "b1": [0.1, 0.9], "T": 100, "simulation_env_name": "sim_env", @@ -186,7 +183,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: eta = 2 p_a = 0.05 p_c_1 = 0.01 - BTR = np.inf + BTR = 100 negate_costs = False discount_factor = 1 - p_c_1 num_observations = 100 @@ -214,7 +211,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: eta=eta, p_a=p_a, p_c_1=p_c_1, - BTR=BTR, + BTR=100, negate_costs=negate_costs, seed=999, discount_factor=discount_factor, @@ -227,7 +224,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: observation_tensor=observation_tensor, transition_tensor=transition_tensor, b1=IntrusionRecoveryPomdpUtil.initial_belief(p_a=p_a), - T=BTR, + T=int(BTR), simulation_env_name=simulation_name, gym_env_name="csle-tolerance-intrusion-recovery-pomdp-v1", ) @@ -238,7 +235,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: assert dto.eta == 2 assert dto.p_a == 0.05 assert dto.p_c_1 == 0.01 - assert dto.BTR == np.inf + assert dto.BTR == 100 assert dto.negate_costs is False assert dto.seed == 999 assert dto.discount_factor == 1 - p_c_1 diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py index 0e9428958..dd18e3dad 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py @@ -202,7 +202,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: p_c_1 = 0.01 p_c_2 = 0.01 p_u = 0.0 - BTR = np.inf + BTR = 10 negate_costs = False discount_factor = 1 - p_c_1 num_observations = 100 @@ -259,7 +259,7 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: assert dto.p_c_1 == 0.01 assert dto.p_c_2 == 0.01 assert dto.p_u == 0.0 - assert dto.BTR == np.inf + assert dto.BTR == BTR assert dto.negate_costs is False assert dto.seed == 999 assert dto.discount_factor == 1 - p_c_1 diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py index 4f8f223d0..73d53a9fb 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py @@ -7,6 +7,7 @@ import csle_common.constants.constants as constants import numpy as np import pytest +from typing import Dict, Any class TestInstrusionRecoveryPomdpEnvSuite: @@ -153,7 +154,7 @@ def test_info(self) -> None: gym_env_name="gym", max_horizon=np.inf, ) - info = {} + info: Dict[str, Any] = {} with pytest.raises(IndexError): assert IntrusionRecoveryPomdpEnv(config)._info(info) is not None @@ -247,7 +248,7 @@ def test_reset_traces(self) -> None: gym_env_name="gym", max_horizon=np.inf, ) - assert IntrusionRecoveryPomdpEnv(config).reset_traces() is None + assert not IntrusionRecoveryPomdpEnv(config).reset_traces() @patch("time.time") # Mock the time.time function @patch( diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 87a272aff..08ce57ba2 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -2,6 +2,9 @@ from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( IntrusionRecoveryPomdpConfig, ) +from csle_tolerance.dao.intrusion_recovery_game_config import ( + IntrusionRecoveryGameConfig, +) import numpy as np import pytest @@ -133,6 +136,28 @@ def test_transition_function(self) -> None: == 0.2 ) + def test_transition_function_game(self) -> None: + """ + Tests the transition function of the POSG + + :return: None + """ + s = 0 + s_prime = 1 + a1 = 0 + a2 = 1 + p_a = 0.2 + p_c_1 = 0.1 + assert ( + round( + IntrusionRecoveryPomdpUtil.transition_function_game( + s, s_prime, a1, a2, p_a, p_c_1 + ), + 2, + ) + == 0.18 + ) + def test_transition_tensor(self) -> None: """ Tests the function of creating a tensor with the transition probabilities of the POMDP @@ -160,6 +185,31 @@ def test_transition_tensor(self) -> None: states, actions, p_a, p_c_1, p_c_2, p_u ) + def test_transition_tensor_game(self) -> None: + """ + Tests the function of creating a tensor with the transition probabilities of the POSG + + :return: None + """ + states = [0, 1, 2] + defender_actions = [0, 1] + attacker_actions = [0, 1] + p_a = 0.5 + p_c_1 = 0.3 + result = IntrusionRecoveryPomdpUtil.transition_tensor_game( + states, defender_actions, attacker_actions, p_a, p_c_1 + ) + assert len(result) == len(defender_actions) + assert all(len(a1) == len(attacker_actions) for a1 in result) + assert all(len(a2) == len(states) for a1 in result for a2 in a1) + assert all(len(s) == len(states) for a1 in result for a2 in a1 for s in a2) + + assert result[0][1][0][0] == (1 - p_a) * (1 - p_c_1) + assert result[1][0][1][1] == 0 + assert result[1][1][2][2] == 1.0 + assert result[0][1][0][1] == (1 - p_c_1) * p_a + assert result[0][0][0][2] == p_c_1 + def test_sample_initial_state(self) -> None: """ Tests the function of sampling the initial state @@ -311,3 +361,243 @@ def test_pomdp_solver_file(self) -> None: ) is not None ) + + def test_sample_next_state_game(self) -> None: + """ + Tests the function of sampling the next observation + + :return: None + """ + np.random.seed(40) + transition_tensor = [ + [ + [[0.1, 0.9], [0.7, 0.3], [0.5, 0.5]], + [[0.2, 0.8], [0.6, 0.4], [0.4, 0.6]], + ], + [ + [[0.3, 0.7], [0.8, 0.2], [0.6, 0.4]], + [[0.4, 0.6], [0.5, 0.5], [0.3, 0.7]], + ], + ] + + s = 0 + a1 = 0 + a2 = 0 + + count = [0, 0] + for _ in range(1000): + s_prime = IntrusionRecoveryPomdpUtil.sample_next_state_game( + transition_tensor, s, a1, a2 + ) + count[s_prime] += 1 + assert 850 <= count[1] <= 950 + assert 50 <= count[0] <= 150 + + def test_generate_transitions(self) -> None: + """ + Tests the function of generating the transition rows of the POSG config file of HSVI + + :return: None + """ + dto = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[0.6, 0.4], [0.5, 0.5]], + transition_tensor=[ + [ + [ + [0.1, 0.2, 0.7], + [0.3, 0.4, 0.3], + [0.2, 0.2, 0.6], + ], + [ + [0.2, 0.3, 0.5], + [0.4, 0.3, 0.3], + [0.3, 0.3, 0.4], + ], + ], + [ + [ + [0.5, 0.3, 0.2], + [0.3, 0.5, 0.2], + [0.4, 0.3, 0.3], + ], + [ + [0.3, 0.4, 0.3], + [0.5, 0.4, 0.1], + [0.2, 0.5, 0.3], + ], + ], + ], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + assert ( + IntrusionRecoveryPomdpUtil.generate_transitions(dto)[0] == "0 0 0 0 0 0.06" + ) + + def test_generate_rewards(self) -> None: + """ + Tests the function of generating the reward rows of the POSG config file of HSVI + + :return: None + """ + dto = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2, 3], [4, 5, 6]], + observation_tensor=[[0.6, 0.4], [0.5, 0.5]], + transition_tensor=[ + [ + [ + [0.1, 0.2, 0.7], + [0.3, 0.4, 0.3], + [0.2, 0.2, 0.6], + ], + [ + [0.2, 0.3, 0.5], + [0.4, 0.3, 0.3], + [0.3, 0.3, 0.4], + ], + ], + [ + [ + [0.5, 0.3, 0.2], + [0.3, 0.5, 0.2], + [0.4, 0.3, 0.3], + ], + [ + [0.3, 0.4, 0.3], + [0.5, 0.4, 0.1], + [0.2, 0.5, 0.3], + ], + ], + ], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + assert IntrusionRecoveryPomdpUtil.generate_rewards(dto)[0] == "0 0 0 -1" + + def test_generate_os_posg_game_file(self) -> None: + """ """ + + states = [0, 1, 2] + actions = [0, 1] + observations = [0, 1] + + transition_tensor = [ + [ + [ + [0.1, 0.2, 0.7], + [0.3, 0.4, 0.3], + [0.2, 0.2, 0.6], + ], + [ + [0.2, 0.3, 0.5], + [0.4, 0.3, 0.3], + [0.3, 0.3, 0.4], + ], + ], + [ + [ + [0.5, 0.3, 0.2], + [0.3, 0.5, 0.2], + [0.4, 0.3, 0.3], + ], + [ + [0.3, 0.4, 0.3], + [0.5, 0.4, 0.1], + [0.2, 0.5, 0.3], + ], + ], + ] + + observation_tensor = [ + [0.6, 0.4], + [0.5, 0.5], + ] + + cost_tensor = [ + [1.0, 2.0, 3.0], + [4.0, 5.0, 6.0], + ] + + game_config = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=states, + actions=actions, + observations=observations, + cost_tensor=cost_tensor, + observation_tensor=observation_tensor, + transition_tensor=transition_tensor, + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + + game_file_str = IntrusionRecoveryPomdpUtil.generate_os_posg_game_file( + game_config + ) + + expected_game_description = "3 1 2 2 2 72 12 0.9" + expected_state_descriptions = ["0 0", "1 0", "2 0"] + expected_player_1_actions = ["WAIT", "RECOVER"] + expected_player_2_actions = ["FALSEALARM", "ATTACK"] + expected_obs_descriptions = ["o_0", "o_1"] + expected_player_2_legal_actions = ["0 1", "0 1", "0 1"] + expected_player_1_legal_actions = ["0 1"] + + output_lines = game_file_str.split("\n") + + assert ( + output_lines[0] == expected_game_description + ), f"Game description mismatch: {output_lines[0]}" + assert ( + output_lines[1:4] == expected_state_descriptions + ), f"State descriptions mismatch: {output_lines[1:4]}" + assert ( + output_lines[4:6] == expected_player_1_actions + ), f"Player 1 actions mismatch: {output_lines[4:6]}" + assert ( + output_lines[6:8] == expected_player_2_actions + ), f"Player 2 actions mismatch: {output_lines[6:8]}" + assert ( + output_lines[8:10] == expected_obs_descriptions + ), f"Observation descriptions mismatch: {output_lines[8:10]}" + assert ( + output_lines[10:13] == expected_player_2_legal_actions + ), f"Player 2 legal actions mismatch: {output_lines[10:13]}" + assert ( + output_lines[13:14] == expected_player_1_legal_actions + ), f"Player 1 legal actions mismatch: {output_lines[13:14]}" diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py index 06f3043e0..562485750 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py @@ -57,8 +57,8 @@ def test_constraint_cost_function(self) -> None: :return: None """ states = [1, 2] - f = 0.5 - expected = [0.0, 1.0] + f = 5 + expected = [0.0, 0.0] assert ( IntrusionResponseCmdpUtil.constraint_cost_function(states[0], f) == expected[0] @@ -75,8 +75,8 @@ def test_constraint_cost_tensor(self) -> None: :return: None """ states = [1, 2] - f = 0.5 - expected = [0.0, 1.0] + f = 5 + expected = [0.0, 0.0] assert IntrusionResponseCmdpUtil.constraint_cost_tensor(states, f) == expected def test_delta_function(self) -> None: diff --git a/simulation-system/libs/gym-csle-stopping-game/src/gym_csle_stopping_game/envs/stopping_game_env.py b/simulation-system/libs/gym-csle-stopping-game/src/gym_csle_stopping_game/envs/stopping_game_env.py index decd1781e..53a757972 100644 --- a/simulation-system/libs/gym-csle-stopping-game/src/gym_csle_stopping_game/envs/stopping_game_env.py +++ b/simulation-system/libs/gym-csle-stopping-game/src/gym_csle_stopping_game/envs/stopping_game_env.py @@ -42,7 +42,6 @@ def __init__(self, config: StoppingGameConfig): # Initialize environment state self.state = StoppingGameState(b1=self.config.b1, L=self.config.L) - # Setup spaces self.attacker_observation_space = self.config.attacker_observation_space() self.defender_observation_space = self.config.defender_observation_space() @@ -437,6 +436,8 @@ def get_observation_from_history(self, history: List[int], pi2: npt.NDArray[Any] :param l: the number of stops remaining :return: the observation """ + if not history: + raise ValueError("History must not be empty") return [history[-1]] def generate_random_particles(self, o: int, num_particles: int) -> List[int]: diff --git a/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_env.py b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_env.py new file mode 100644 index 000000000..12b457457 --- /dev/null +++ b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_env.py @@ -0,0 +1,480 @@ +from gym_csle_stopping_game.envs.stopping_game_env import StoppingGameEnv +from gym_csle_stopping_game.dao.stopping_game_config import StoppingGameConfig +from gym_csle_stopping_game.dao.stopping_game_state import StoppingGameState +from csle_common.constants import constants +from unittest.mock import patch, MagicMock +from gym.spaces import Box, Discrete +import pytest +import numpy as np + + +class TestStoppingGameEnvSuite: + """ + Test suite for stopping_game_env.py + """ + + @pytest.fixture(autouse=True) + def setup_env(self) -> None: + """ + Sets up the configuration of the stopping game + + :return: None + """ + env_name = "test_env" + T = np.array([[[0.1, 0.9], [0.4, 0.6]], [[0.7, 0.3], [0.2, 0.8]]]) + O = np.array([0, 1]) + Z = np.array([[[0.8, 0.2], [0.5, 0.5]], [[0.4, 0.6], [0.9, 0.1]]]) + R = np.zeros((2, 3, 3, 3)) + S = np.array([0, 1, 2]) + A1 = np.array([0, 1, 2]) + A2 = np.array([0, 1, 2]) + L = 2 + R_INT = 1 + R_COST = 2 + R_SLA = 3 + R_ST = 4 + b1 = np.array([0.6, 0.4]) + save_dir = "save_directory" + checkpoint_traces_freq = 100 + gamma = 0.9 + compute_beliefs = True + save_trace = True + self.config = StoppingGameConfig( + env_name, + T, + O, + Z, + R, + S, + A1, + A2, + L, + R_INT, + R_COST, + R_SLA, + R_ST, + b1, + save_dir, + checkpoint_traces_freq, + gamma, + compute_beliefs, + save_trace, + ) + + def test_stopping_game_init_(self) -> None: + """ + Tests the initializing function + + :return: None + """ + T = np.array([[[0.1, 0.9], [0.4, 0.6]], [[0.7, 0.3], [0.2, 0.8]]]) + O = np.array([0, 1]) + A1 = np.array([0, 1, 2]) + A2 = np.array([0, 1, 2]) + L = 2 + b1 = np.array([0.6, 0.4]) + attacker_observation_space = Box( + low=np.array([0.0, 0.0, 0.0]), + high=np.array([float(L), 1.0, 2.0]), + dtype=np.float64, + ) + defender_observation_space = Box( + low=np.array([0.0, 0.0]), + high=np.array([float(L), 1.0]), + dtype=np.float64, + ) + attacker_action_space = Discrete(len(A2)) + defender_action_space = Discrete(len(A1)) + + assert self.config.T.any() == T.any() + assert self.config.O.any() == O.any() + assert self.config.b1.any() == b1.any() + assert self.config.L == L + + env = StoppingGameEnv(self.config) + assert env.config == self.config + assert ( + env.attacker_observation_space.low.any() + == attacker_observation_space.low.any() + ) + assert ( + env.defender_observation_space.low.any() + == defender_observation_space.low.any() + ) + assert env.attacker_action_space.n == attacker_action_space.n + assert env.defender_action_space.n == defender_action_space.n + assert env.traces == [] + + with patch( + "gym_csle_stopping_game.dao.stopping_game_state.StoppingGameState" + ) as MockStoppingGameState: + MockStoppingGameState(b1=self.config.b1, L=self.config.L) + with patch( + "gym_csle_stopping_game.util.stopping_game_util.StoppingGameUtil.sample_initial_state" + ) as MockSampleInitialState: + MockSampleInitialState.return_value = 0 + env = StoppingGameEnv(self.config) + MockSampleInitialState.assert_called() + MockStoppingGameState.assert_called_once_with( + b1=self.config.b1, L=self.config.L + ) + + with patch( + "csle_common.dao.simulation_config.simulation_trace.SimulationTrace" + ) as MockSimulationTrace: + mock_trace = MockSimulationTrace(self.config.env_name).return_value + print(mock_trace) + env = StoppingGameEnv(self.config) + print(env.trace) + MockSimulationTrace.assert_called_once_with(self.config.env_name) + + def test_mean(self) -> None: + """ + Tests the utility function for getting the mean of a vector + + :return: None + """ + test_cases = [ + ([], 0), # Test case for an empty vector + ([5], 0), # Test case for a vector with a single element + ([0.2, 0.3, 0.5], 1.3), # Test case for a vector with multiple elements + ] + for prob_vector, expected_mean in test_cases: + result = StoppingGameEnv(self.config).mean(prob_vector) + assert result == expected_mean + + def test_weighted_intrusion_prediction_distance(self) -> None: + """ + Tests the function of computing the weighed intrusion start time prediction distance + """ + # Test case when first_stop is before intrusion_start + result1 = StoppingGameEnv(self.config).weighted_intrusion_prediction_distance( + 5, 3 + ) + assert result1 == 0 + + # Test case when first_stop is after intrusion_start + result2 = StoppingGameEnv(self.config).weighted_intrusion_prediction_distance( + 3, 5 + ) + assert result2 == 0.95 + + # Test case when first_stop is equal to intrusion_start + result3 = StoppingGameEnv(self.config).weighted_intrusion_prediction_distance( + 3, 3 + ) + assert result3 == 0 + + def test_reset(self) -> None: + """ + Tests the reset function for reseting the environment state + + :return: None + """ + env = StoppingGameEnv(self.config) + env.state = MagicMock() + env.state.l = 10 + env.state.s = "initial_state" + env.state.t = 0 + env.state.attacker_observation.return_value = np.array([1, 2, 3]) + env.state.defender_observation.return_value = np.array([4, 5, 6]) + + env.trace = MagicMock() + env.trace.attacker_rewards = [1] + env.traces = [] + # Call the reset method + observation, info = env.reset() + # Assertions + assert env.state.reset.called, "State's reset method was not called." + assert ( + env.trace.simulation_env == self.config.env_name + ), "Trace was not initialized correctly." + assert ( + observation[0].all() == np.array([4, 5, 6]).all() + ), "Observation does not match expected values." + + assert ( + info[env_constants.ENV_METRICS.STOPS_REMAINING] == env.state.l + ), "Stops remaining does not match expected value." + assert ( + info[env_constants.ENV_METRICS.STATE] == env.state.s + ), "State info does not match expected value." + assert ( + info[env_constants.ENV_METRICS.OBSERVATION] == 0 + ), "Observation info does not match expected value." + assert ( + info[env_constants.ENV_METRICS.TIME_STEP] == env.state.t + ), "Time step info does not match expected value." + + # Check if trace was appended correctly + if len(env.trace.attacker_rewards) > 0: + assert env.traces[-1] == env.trace, "Trace was not appended correctly." + + def test_render(self) -> None: + """ + Tests the function of rendering the environment + + :return: None + """ + with pytest.raises(NotImplementedError): + StoppingGameEnv(self.config).render() + + def test_is_defense_action_legal(self) -> None: + """ + Tests the function of checking whether a defender action in the environment is legal or not + + :return: None + """ + assert StoppingGameEnv(self.config).is_defense_action_legal(1) + + def test_is_attack_action_legal(self) -> None: + """ + Tests the function of checking whether an attacker action in the environment is legal or not + + :return: None + """ + assert StoppingGameEnv(self.config).is_attack_action_legal(1) + + def test_get_traces(self) -> None: + """ + Tests the function of getting the list of simulation traces + + :return: None + """ + assert ( + StoppingGameEnv(self.config).get_traces() + == StoppingGameEnv(self.config).traces + ) + + def test_reset_traces(self) -> None: + """ + Tests the function of resetting the list of traces + + :return: None + """ + env = StoppingGameEnv(self.config) + env.traces = ["trace1", "trace2"] + env.reset_traces() + assert env.traces == [] + + def test_checkpoint_traces(self) -> None: + """ + Tests the function of checkpointing agent traces + + :return: None + """ + env = StoppingGameEnv(self.config) + fixed_timestamp = 123 + with patch("time.time", return_value=fixed_timestamp): + with patch( + "csle_common.dao.simulation_config.simulation_trace.SimulationTrace.save_traces" + ) as mock_save_traces: + env.traces = ["trace1", "trace2"] + env._StoppingGameEnv__checkpoint_traces() + mock_save_traces.assert_called_once_with( + traces_save_dir=constants.LOGGING.DEFAULT_LOG_DIR, + traces=env.traces, + traces_file=f"taus{fixed_timestamp}.json", + ) + + def test_set_model(self) -> None: + """ + Tests the function of setting the model + + :return: None + """ + env = StoppingGameEnv(self.config) + mock_model = MagicMock() + env.set_model(mock_model) + assert env.model == mock_model + + def test_set_state(self) -> None: + """ + Tests the function of setting the state + + :return: None + """ + env = StoppingGameEnv(self.config) + env.state = MagicMock() + + mock_state = MagicMock(spec=StoppingGameState) + env.set_state(mock_state) + assert env.state == mock_state + + state_int = 5 + env.set_state(state_int) + assert env.state.s == state_int + assert env.state.l == self.config.L + + state_tuple = (3, 7) + env.set_state(state_tuple) + assert env.state.s == state_tuple[0] + assert env.state.l == state_tuple[1] + + with pytest.raises(ValueError): + env.set_state([1, 2, 3]) + + def test_is_state_terminal(self) -> None: + """ + Tests the function of checking whether a given state is terminal or not + + :return: None + """ + env = StoppingGameEnv(self.config) + env.state = MagicMock() + + mock_state = MagicMock(spec=StoppingGameState) + mock_state.s = 2 + assert env.is_state_terminal(mock_state) + mock_state.s = 1 + assert not env.is_state_terminal(mock_state) + state_int = 2 + assert env.is_state_terminal(state_int) + state_int = 1 + assert not env.is_state_terminal(state_int) + state_tuple = (2, 5) + assert env.is_state_terminal(state_tuple) + state_tuple = (1, 5) + assert not env.is_state_terminal(state_tuple) + + with pytest.raises(ValueError): + env.is_state_terminal([1, 2, 3]) + + def test_get_observation_from_history(self) -> None: + """ + Tests the function of getting a hidden observation based on a history + + :return: None + """ + env = StoppingGameEnv(self.config) + history = [1, 2, 3, 4, 5] + pi2 = np.array([0.1, 0.9]) + l = 3 + observation = env.get_observation_from_history(history, pi2, l) + assert observation == [5] + + history = [] + with pytest.raises(ValueError, match="History must not be empty"): + env.get_observation_from_history(history, pi2, l) + + def test_generate_random_particles(self) -> None: + """ + Tests the funtion of generating a random list of state particles from a given observation + + :return: None + """ + env = StoppingGameEnv(self.config) + num_particles = 10 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + assert all(p in [0, 1] for p in particles) + + num_particles = 0 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + + def test_step(self) -> None: + """ + Tests the funtion of taking a step in the environment by executing the given action + + :return: None + """ + env = StoppingGameEnv(self.config) + env.state = MagicMock() + env.state.s = 1 + env.state.l = 2 + env.state.t = 0 + env.state.attacker_observation.return_value = np.array([1, 2, 3]) + env.state.defender_observation.return_value = np.array([4, 5, 6]) + env.state.b = np.array([0.5, 0.5, 0.0]) + + env.trace = MagicMock() + env.trace.defender_rewards = [] + env.trace.attacker_rewards = [] + env.trace.attacker_actions = [] + env.trace.defender_actions = [] + env.trace.infos = [] + env.trace.states = [] + env.trace.beliefs = [] + env.trace.infrastructure_metrics = [] + env.trace.attacker_observations = [] + env.trace.defender_observations = [] + + with patch( + "gym_csle_stopping_game.util.stopping_game_util.StoppingGameUtil.sample_next_state", + return_value=2, + ): + with patch( + "gym_csle_stopping_game.util.stopping_game_util.StoppingGameUtil.sample_next_observation", + return_value=1, + ): + with patch( + "gym_csle_stopping_game.util.stopping_game_util.StoppingGameUtil.next_belief", + return_value=np.array([0.3, 0.7, 0.0]), + ): + action_profile = ( + 1, + ( + np.array( + [[0.2, 0.8, 0.0], [0.6, 0.4, 0.0], [0.5, 0.5, 0.0]] + ), + 2, + ), + ) + observations, rewards, terminated, truncated, info = env.step( + action_profile + ) + + assert ( + observations[0] == np.array([4, 5, 6]) + ).all(), "Incorrect defender observations" + assert ( + observations[1] == np.array([1, 2, 3]) + ).all(), "Incorrect attacker observations" + assert rewards == (0, 0) + assert not terminated + assert not truncated + assert env.trace.defender_rewards[-1] == 0 + assert env.trace.attacker_rewards[-1] == 0 + assert env.trace.attacker_actions[-1] == 2 + assert env.trace.defender_actions[-1] == 1 + assert env.trace.infos[-1] == info + assert env.trace.states[-1] == 2 + print(env.trace.beliefs) + assert env.trace.beliefs[-1] == 0.7 + assert env.trace.infrastructure_metrics[-1] == 1 + assert ( + env.trace.attacker_observations[-1] == np.array([1, 2, 3]) + ).all() + assert ( + env.trace.defender_observations[-1] == np.array([4, 5, 6]) + ).all() + + def test_info(self) -> None: + """ + Tests the function of adding the cumulative reward and episode length to the info dict + + :return: None + """ + env = StoppingGameEnv(self.config) + env.trace = MagicMock() + env.trace.defender_rewards = [1, 2] + env.trace.attacker_actions = [0, 1] + env.trace.defender_actions = [0, 1] + env.trace.states = [0, 1] + env.trace.infrastructure_metrics = [0, 1] + + info = {} + updated_info = env._info(info) + print(updated_info) + assert updated_info[env_constants.ENV_METRICS.RETURN] == sum(env.trace.defender_rewards) + + def test_emulation_evaluation(self) -> None: + """ + Tests the function for evaluating a strategy profile in the emulation environment + + :return: None + """ + env = StoppingGameEnv(self.config) + env.state.b1 = [0.5, 0.5] + pass \ No newline at end of file diff --git a/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_mdp_attacker_env.py b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_mdp_attacker_env.py new file mode 100644 index 000000000..1eb438281 --- /dev/null +++ b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_mdp_attacker_env.py @@ -0,0 +1,343 @@ +from gym_csle_stopping_game.envs.stopping_game_mdp_attacker_env import ( + StoppingGameMdpAttackerEnv, +) +from gym_csle_stopping_game.dao.stopping_game_config import StoppingGameConfig +from gym_csle_stopping_game.dao.stopping_game_attacker_mdp_config import ( + StoppingGameAttackerMdpConfig, +) +from gym_csle_stopping_game.envs.stopping_game_env import StoppingGameEnv +from csle_common.dao.training.policy import Policy +import pytest +from unittest.mock import MagicMock +import numpy as np + + +class TestStoppingGameMdpAttackerEnvSuite: + """ + Test suite for stopping_game_mdp_attacker_env.py + """ + + @pytest.fixture(autouse=True) + def setup_env(self) -> None: + """ + Sets up the configuration of the stopping game + + :return: None + """ + env_name = "test_env" + T = np.array([[[0.1, 0.9], [0.4, 0.6]], [[0.7, 0.3], [0.2, 0.8]]]) + O = np.array([0, 1]) + Z = np.array([[[0.8, 0.2], [0.5, 0.5]], [[0.4, 0.6], [0.9, 0.1]]]) + R = np.zeros((2, 3, 3, 3)) + S = np.array([0, 1, 2]) + A1 = np.array([0, 1, 2]) + A2 = np.array([0, 1, 2]) + L = 2 + R_INT = 1 + R_COST = 2 + R_SLA = 3 + R_ST = 4 + b1 = np.array([0.6, 0.4]) + save_dir = "save_directory" + checkpoint_traces_freq = 100 + gamma = 0.9 + compute_beliefs = True + save_trace = True + self.config = StoppingGameConfig( + env_name, + T, + O, + Z, + R, + S, + A1, + A2, + L, + R_INT, + R_COST, + R_SLA, + R_ST, + b1, + save_dir, + checkpoint_traces_freq, + gamma, + compute_beliefs, + save_trace, + ) + + def test_init_(self) -> None: + """ + Tests the initializing function + + :return: None + """ + # Mock the defender strategy + defender_strategy = MagicMock(spec=Policy) + # Create the attacker MDP configuration + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + # Initialize the StoppingGameMdpAttackerEnv + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + assert env.config == attacker_mdp_config + assert env.observation_space == self.config.attacker_observation_space() + assert env.action_space == self.config.attacker_action_space() + assert env.static_defender_strategy == defender_strategy + # print(env.latest_defender_obs) + # assert not env.latest_defender_obs + # assert not env.latest_attacker_obs + assert not env.model + assert not env.viewer + + def test_reset(self) -> None: + """ + Tests the function for reseting the environment state + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + attacker_obs, info = env.reset() + assert env.latest_defender_obs.all() == np.array([2, 0.4]).all() + assert info == {} + + def test_set_model(self) -> None: + """ + Tests the function for setting the model + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + mock_model = MagicMock() + env.set_model(mock_model) + assert env.model == mock_model + + def test_set_state(self) -> None: + """ + Tests the function for setting the state + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + assert not env.set_state(1) + + def test_calculate_stage_policy(self) -> None: + """ + Tests the function for calculating the stage policy of a given model and observation + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + env.model = None + observation = [1, 0.5] + stage_policy = env.calculate_stage_policy(o=observation) + expected_stage_policy = np.array([[1.0, 0.0], [1.0, 0.0], [0.5, 0.5]]) + assert stage_policy.all() == expected_stage_policy.all() + + def test_get_attacker_dist(self) -> None: + """ + Tests the function for getting the attacker's action distribution based on a given observation + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + env.model = None + observation = [1, 0.5, 0] + with pytest.raises(ValueError, match="Model is None"): + env._get_attacker_dist(observation) + + def test_render(self) -> None: + """ + Tests the function for rendering the environment + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + with pytest.raises(NotImplementedError): + env.render("human") + + def test_is_defense_action_legal(self) -> None: + """ + Tests the function of checking whether a defender action in the environment is legal or not + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + assert env.is_defense_action_legal(1) + + def test_is_attack_action_legal(self) -> None: + """ + Tests the function of checking whether an attacker action in the environment is legal or not + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + assert env.is_attack_action_legal(1) + + def test_get_traces(self) -> None: + """ + Tests the function of getting the list of simulation traces + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + assert env.get_traces() == StoppingGameEnv(self.config).traces + + def test_reset_traces(self) -> None: + """ + Tests the function of resetting the list of traces + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + env.traces = ["trace1", "trace2"] + env.reset_traces() + assert StoppingGameEnv(self.config).traces == [] + + def test_generate_random_particles(self) -> None: + """ + Tests the funtion of generating a random list of state particles from a given observation + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + num_particles = 10 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + assert all(p in [0, 1] for p in particles) + + num_particles = 0 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + + def test_get_actions_from_particles(self) -> None: + """ + Tests the function for pruning the set of actions based on the current particle set + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + particles = [1, 2, 3] + t = 0 + observation = 0 + expected_actions = [0, 1, 2] + assert ( + env.get_actions_from_particles(particles, t, observation) + == expected_actions + ) + + def test_step(self) -> None: + """ + Tests the function for taking a step in the environment by executing the given action + + :return: None + """ + defender_strategy = MagicMock(spec=Policy) + attacker_mdp_config = StoppingGameAttackerMdpConfig( + env_name="test_env", + stopping_game_config=self.config, + defender_strategy=defender_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + + env = StoppingGameMdpAttackerEnv(config=attacker_mdp_config) + pi2 = np.array([[0.5, 0.5]]) + with pytest.raises(AssertionError): + env.step(pi2) diff --git a/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_pomdp_defender_env.py b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_pomdp_defender_env.py new file mode 100644 index 000000000..fbc04c500 --- /dev/null +++ b/simulation-system/libs/gym-csle-stopping-game/tests/test_stopping_game_pomdp_defender_env.py @@ -0,0 +1,328 @@ +from gym_csle_stopping_game.envs.stopping_game_pomdp_defender_env import ( + StoppingGamePomdpDefenderEnv, +) +from gym_csle_stopping_game.dao.stopping_game_config import StoppingGameConfig +from gym_csle_stopping_game.dao.stopping_game_defender_pomdp_config import ( + StoppingGameDefenderPomdpConfig, +) +from gym_csle_stopping_game.envs.stopping_game_env import StoppingGameEnv +from csle_common.dao.training.policy import Policy +import pytest +from unittest.mock import MagicMock +import numpy as np + + +class TestStoppingGamePomdpDefenderEnvSuite: + """ + Test suite for stopping_game_pomdp_defender_env.py + """ + + @pytest.fixture(autouse=True) + def setup_env(self) -> None: + """ + Sets up the configuration of the stopping game + + :return: None + """ + env_name = "test_env" + T = np.array([[[0.1, 0.9], [0.4, 0.6]], [[0.7, 0.3], [0.2, 0.8]]]) + O = np.array([0, 1]) + Z = np.array([[[0.8, 0.2], [0.5, 0.5]], [[0.4, 0.6], [0.9, 0.1]]]) + R = np.zeros((2, 3, 3, 3)) + S = np.array([0, 1, 2]) + A1 = np.array([0, 1, 2]) + A2 = np.array([0, 1, 2]) + L = 2 + R_INT = 1 + R_COST = 2 + R_SLA = 3 + R_ST = 4 + b1 = np.array([0.6, 0.4]) + save_dir = "save_directory" + checkpoint_traces_freq = 100 + gamma = 0.9 + compute_beliefs = True + save_trace = True + self.config = StoppingGameConfig( + env_name, + T, + O, + Z, + R, + S, + A1, + A2, + L, + R_INT, + R_COST, + R_SLA, + R_ST, + b1, + save_dir, + checkpoint_traces_freq, + gamma, + compute_beliefs, + save_trace, + ) + + def test_init_(self) -> None: + """ + Tests the initializing function + + :return: None + """ + # Mock the attacker strategy + attacker_strategy = MagicMock(spec=Policy) + # Create the defender POMDP configuration + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + # Initialize the StoppingGamePomdpDefenderEnv + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert env.config == defender_pomdp_config + assert env.observation_space == self.config.defender_observation_space() + assert env.action_space == self.config.defender_action_space() + assert env.static_attacker_strategy == attacker_strategy + assert not env.viewer + + def test_reset(self) -> None: + """ + Tests the function for reseting the environment state + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + _, info = env.reset() + assert info + + def test_render(self) -> None: + """ + Tests the function for rendering the environment + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + with pytest.raises(NotImplementedError): + env.render("human") + + def test_is_defense_action_legal(self) -> None: + """ + Tests the function of checking whether a defender action in the environment is legal or not + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert env.is_defense_action_legal(1) + + def test_is_attack_action_legal(self) -> None: + """ + Tests the function of checking whether an attacker action in the environment is legal or not + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert env.is_attack_action_legal(1) + + def test_get_traces(self) -> None: + """ + Tests the function of getting the list of simulation traces + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert env.get_traces() == StoppingGameEnv(self.config).traces + + def test_reset_traces(self) -> None: + """ + Tests the function of resetting the list of traces + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + env.traces = ["trace1", "trace2"] + env.reset_traces() + assert StoppingGameEnv(self.config).traces == [] + + def test_set_model(self) -> None: + """ + Tests the function for setting the model + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + mock_model = MagicMock() + env.set_model(mock_model) + assert env.model == mock_model + + def test_set_state(self) -> None: + """ + Tests the function for setting the state + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert not env.set_state(1) + + def test_get_observation_from_history(self) -> None: + """ + Tests the function for getting a defender observatin (belief) from a history + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + history = [1, 2, 3] + l = self.config.L + pi2 = env.static_attacker_strategy.stage_policy(o=0) + assert env.get_observation_from_history(history) == StoppingGameEnv( + self.config + ).get_observation_from_history(history, pi2, l) + + def test_is_state_terminal(self) -> None: + """ + Tests the funciton for checking whether a state is terminal or not + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + assert env.is_state_terminal(1) == StoppingGameEnv( + self.config + ).is_state_terminal(1) + + def test_generate_random_particles(self) -> None: + """ + Tests the funtion of generating a random list of state particles from a given observation + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + num_particles = 10 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + assert all(p in [0, 1] for p in particles) + + num_particles = 0 + particles = env.generate_random_particles(o=1, num_particles=num_particles) + assert len(particles) == num_particles + + def test_get_actions_from_particles(self) -> None: + """ + Tests the function for pruning the set of actions based on the current particle set + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + particles = [1, 2, 3] + t = 0 + observation = 0 + expected_actions = [0, 1, 2] + assert ( + env.get_actions_from_particles(particles, t, observation) + == expected_actions + ) + + def test_step(self) -> None: + """ + Tests the function for taking a step in the environment by executing the given action + + :return: None + """ + attacker_strategy = MagicMock(spec=Policy) + defender_pomdp_config = StoppingGameDefenderPomdpConfig( + env_name="test_env", + stopping_game_config=self.config, + attacker_strategy=attacker_strategy, + stopping_game_name="csle-stopping-game-v1", + ) + env = StoppingGamePomdpDefenderEnv(config=defender_pomdp_config) + a1 = 2 + defender_obs, reward, terminated, truncated, info = env.step(a1) + assert isinstance(defender_obs, int) + assert isinstance(reward, int) + assert isinstance(terminated, bool) + assert isinstance(truncated, bool) + assert isinstance(info, dict)