From fe87b040d849de782df007cf8dd376ae9522b41e Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 24 Apr 2024 15:18:23 +0200 Subject: [PATCH 01/16] initial commit csle tolerance tests --- .../csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 50a4829ab..9a0993585 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -1,7 +1,7 @@ from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil -class TestIntrusionTolerancePomdpSuite(object): +class TestIntrusionTolerancePomdpSuite: """ Test suite for stopping_game_util.py """ From 3235191da0510ce677cc8ce60ff645348ae3b31e Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 1 May 2024 22:43:58 +0200 Subject: [PATCH 02/16] tolerance tests --- .../test_intrusion_response_cmdp_util.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py new file mode 100644 index 000000000..b5b45240d --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py @@ -0,0 +1,43 @@ +from csle_tolerance.util.intrusion_response_cmdp_util import IntrusionResponseCmdpUtil + + +class TestInstrusionToleranceCmdpSuite: + """ + Test suite for intrusion_response_cmdp_util.py + """ + + def test_state_space(self) -> None: + """ + Tests the state space of the CMDP + + :return: None + """ + assert len(IntrusionResponseCmdpUtil.state_space(2)) != 0 + assert IntrusionResponseCmdpUtil.state_space(1) is not None + assert IntrusionResponseCmdpUtil.state_space(0) == [0] + assert all(isinstance(n,int) for n in IntrusionResponseCmdpUtil.state_space(3)) + + def test_action_space(self) -> None: + """ + Tests the action space of the CMDP + + :return: None + """ + assert IntrusionResponseCmdpUtil.action_space() == [0,1] + + def test_cost_function(self) -> None: + """ + Tests the cost function of the CMDP + + :return: None + """ + s = 1 + assert IntrusionResponseCmdpUtil.cost_function(s,True) == -1.0 + assert IntrusionResponseCmdpUtil.cost_function(s,False) == 1.0 + assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,False),float) + + + + + + From b97712c30ca829ebe10fcfdcb130585de59e8456 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 1 May 2024 22:44:43 +0200 Subject: [PATCH 03/16] tolerance tests --- .../csle-tolerance/tests/test_intrusion_response_cmdp_util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py index b5b45240d..c81d75c1b 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py @@ -35,6 +35,7 @@ def test_cost_function(self) -> None: assert IntrusionResponseCmdpUtil.cost_function(s,True) == -1.0 assert IntrusionResponseCmdpUtil.cost_function(s,False) == 1.0 assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,False),float) + assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,True),float) From 8c3f8daada0ac7c3f3ce402e90bc5af3b6d8f967 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Fri, 3 May 2024 19:15:43 +0200 Subject: [PATCH 04/16] added tests functions --- .../test_intrusion_recovery_pomdp_util.py | 2 +- .../test_intrusion_response_cmdp_util.py | 93 +++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 9a0993585..a7019234e 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -3,7 +3,7 @@ class TestIntrusionTolerancePomdpSuite: """ - Test suite for stopping_game_util.py + Test suite for intrusion_recovery_pomdp_util.py """ def test_initial_belief(self) -> None: diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py index c81d75c1b..86998ca70 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py @@ -37,6 +37,99 @@ def test_cost_function(self) -> None: assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,False),float) assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,True),float) + def test_cost_tensor(self) -> None: + """ + Tests the function of creating a tensor with the costs of the CMDP + + :return: None + """ + states = [0,1,2,3] + expected = [-0.0,-1.0,-2.0,-3.0] + assert IntrusionResponseCmdpUtil.cost_tensor(states,True) == expected + assert IntrusionResponseCmdpUtil.cost_tensor(states,False) == [-item for item in expected] + + def test_constraint_cost_function(self) -> None: + """ + Tests the constraint cost function of the CMDP + + :return: None + """ + states = [1,2] + f = 0.5 + expected = [0.0,1.0] + assert IntrusionResponseCmdpUtil.constraint_cost_function(states[0],f) == expected[0] + assert IntrusionResponseCmdpUtil.constraint_cost_function(states[1],f) == expected[1] + + def test_constraint_cost_tensor(self) -> None: + """ + Tests the function of creating a tensor with the constrained costs of the CMDP + + :return: None + """ + states = [1,2] + f = 0.5 + expected = [0.0,1.0] + assert IntrusionResponseCmdpUtil.constraint_cost_tensor(states,f) == expected + + def test_delta_function(self) -> None: + """ + Tests the delta function that gives the probability of the change in the number of the healthy nodes + + :return: None + """ + s = 1 + s_max = 2 + p_a = 0.2 + p_c = 0.5 + p_u = 0.7 + delta = 1 + expected = 1.35 + assert round(IntrusionResponseCmdpUtil.delta_function(s,p_a,p_c,p_u,delta,s_max),2) == expected + + def test_transition_function(self) -> None: + """ + Tests the transition function of the CMDP + + :return: None + """ + s = 0 + s_prime = 1 + a = 1 + s_max = 2 + p_a = 0.2 + p_c = 0.3 + p_u = 0.5 + delta = s_prime - a - s + assert IntrusionResponseCmdpUtil.delta_function(s,p_a,p_c,p_u,delta,s_max) == 3 + + def test_transition_tensor(self) -> None: + """ + Tests the transition tensor function of the CMDP + + :return: None + """ + states = [0,1] + actions = [0] + s_max = 2 + p_a = 0.2 + p_c = 0.3 + p_u = 0.5 + expected = [[[3/5,2/5],[2/5,3/5]]] + transition_tensor = IntrusionResponseCmdpUtil.transition_tensor(states,actions,p_a,p_c,p_u,s_max) + for i in range(len(transition_tensor)): + for j in range(len(transition_tensor[i])): + for k in range(len(transition_tensor[i][j])): + transition_tensor[i][j][k] = round(transition_tensor[i][j][k],1) + assert transition_tensor == expected + + + + + + + + + From 1e378a170b500aa45bf46bdf493203080d31b2c4 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Sat, 4 May 2024 09:45:40 +0200 Subject: [PATCH 05/16] remove redundant lines --- .../src/csle_tolerance/util/intrusion_response_cmdp_util.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_response_cmdp_util.py index ebb056514..3b4caa385 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_response_cmdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_response_cmdp_util.py @@ -142,9 +142,6 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c: fl for s in states: s_a_transitions = [] normalizing_constant = 0.0 - for delta in range(-s_max, s_max + 1): - normalizing_constant += IntrusionResponseCmdpUtil.delta_function(s=s, p_a=p_a, p_c=p_c, p_u=p_u, - delta=delta, s_max=s_max) for s_prime in states: s_a_transitions.append(IntrusionResponseCmdpUtil.transition_function( s=s, s_prime=s_prime, a=a, p_a=p_a, p_c=p_c, p_u=p_u, s_max=s_max)) From 31c01543e45c6eb90ff0857c52bd75719767f9ee Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Mon, 6 May 2024 15:48:04 +0200 Subject: [PATCH 06/16] add intrusion_recovery_pomdp_util tests --- .../test_intrusion_recovery_pomdp_util.py | 198 +++++++++++++++++- 1 file changed, 197 insertions(+), 1 deletion(-) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index a7019234e..69482569d 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -1,11 +1,22 @@ from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil - +from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig +import numpy as np class TestIntrusionTolerancePomdpSuite: """ Test suite for intrusion_recovery_pomdp_util.py """ + def test__state_space(self) -> None: + """ + Tests the state space of the POMDP + + :return: None + """ + assert (isinstance(item,int) for item in IntrusionRecoveryPomdpUtil.state_space()) + assert IntrusionRecoveryPomdpUtil.state_space() is not None + assert IntrusionRecoveryPomdpUtil.state_space() == [0,1,2] + def test_initial_belief(self) -> None: """ Tests the initial_belief function @@ -13,3 +24,188 @@ def test_initial_belief(self) -> None: :return: None """ assert sum(IntrusionRecoveryPomdpUtil.initial_belief(p_a=0.5)) == 1 + + def test_action_space(self) -> None: + """ + Tests the action space of the POMDP + + :return: None + """ + assert (isinstance(item,int) for item in IntrusionRecoveryPomdpUtil.action_space()) + assert IntrusionRecoveryPomdpUtil.action_space() is not None + assert IntrusionRecoveryPomdpUtil.action_space() == [0,1] + + def test_observation_space(self) -> None: + """ + Tests the observation space of the POMDP + + :return: None + """ + num_observation = 3 + expected = [0,1,2] + assert IntrusionRecoveryPomdpUtil.observation_space(num_observation) == expected + + def test_cost_function(self) -> None: + """ + Tests the cost function of the POMDP + + :return: None + """ + s = 1 + a = 0 + eta = 0.5 + negate = False + assert IntrusionRecoveryPomdpUtil.cost_function(s,a,eta,negate) == 0.5 + + def test_cost_tensor(self) -> None: + """ + Tests the function of creating a tensor with the costs of the POMDP + + :return: None + """ + eta = 0.5 + states = [0,1] + actions = [0] + negate = False + expected = [[0,0.5]] + assert IntrusionRecoveryPomdpUtil.cost_tensor(eta,states,actions,negate) == expected + + def test_observation_function(self) -> None: + """ + Tests the observation function of the POMDP + + :return: None + """ + s = 1 + o = 1 + num_observations = 2 + expected = 0.6 + assert round(IntrusionRecoveryPomdpUtil.observation_function(s,o,num_observations),1) + + def test_observation_tensor(self) -> None: + """ + Tests the function of creating a tensor with observation probabilities + + :return: None + """ + states = [0,1] + observations = [0,1] + expected = [[0.8,0.2],[0.4,0.6]] + obs_tensor = IntrusionRecoveryPomdpUtil.observation_tensor(states,observations) + for i in range(len(obs_tensor)): + for j in range(len(obs_tensor[i])): + obs_tensor[i][j] = round(obs_tensor[i][j],1) + assert sum(obs_tensor[i]) == 1 + assert obs_tensor == expected + + def test_transition_function(self) -> None: + """ + Tests the transition function of the POMDP + + :return: None + """ + s = 0 + s_prime = 1 + a = 0 + p_a = 0.3 + p_c_1 = 0.3 + p_c_2 = 0.5 + p_u = 0.2 + assert round(IntrusionRecoveryPomdpUtil.transition_function(s,s_prime,a,p_a,p_c_1,p_c_2,p_u),1) == 0.2 + + def test_transition_tensor(self) -> None: + """ + Tests the function of creating a tensor with the transition probabilities of the POMDP + + :return: None + """ + states = [0,1] + actions = [0] + p_a = 0.3 + p_c_1 = 0.3 + p_c_2 = 0.5 + p_u = 0.2 + expected = [[[0.5,0.2],[0.1,0.4]]] + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor(states,actions,p_a,p_c_1,p_c_2,p_u) + for i in range(len(transition_tensor)): + for j in range(len(transition_tensor[i])): + for k in range(len(transition_tensor[i][j])): + transition_tensor[i][j][k] = round(transition_tensor[i][j][k],1) + assert transition_tensor == expected + + def test_sample_initial_state(self) -> None: + """ + Tests the function of sampling the initial state + + :return: None + """ + b1 = [0.2,0.8] + assert isinstance(IntrusionRecoveryPomdpUtil.sample_initial_state(b1),int) + assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) <= len(b1) + assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) >= 0 + + def test_sampe_next_observation(self) -> None: + """ + Tests the function of sampling the next observation + + :return: None + """ + observation_tensor = [[0.8,0.2],[0.4,0.6]] + s_prime = 1 + observations = [0,1] + assert isinstance(IntrusionRecoveryPomdpUtil.sample_next_observation(observation_tensor,s_prime,observations),int) + + def test_bayes_filter(self) -> None: + """ + Tests the function of a bayesian filter to computer b[s_prime] of the POMDP + + :return: None + """ + s_prime = 1 + o = 0 + a = 0 + b = [0.2,0.8] + states = [0,1] + observations = [0,1] + observation_tensor = [[0.8,0.2],[0.4,0.6]] + transition_tensor = [[[0.5,0.2],[0.1,0.4]]] + b_prime_s_prime = 0.5 + assert round(IntrusionRecoveryPomdpUtil.bayes_filter(s_prime,o,a,b,states,observations,observation_tensor,transition_tensor),1) == b_prime_s_prime + + def test_p_o_given_b_a1_a2(self) -> None: + """ + Tests the function of computing P[o|a,b] of the POMDP + + :return: None + """ + o = 0 + b = [0.2,0.8] + a = 0 + states = [0,1] + observation_tensor = [[0.8,0.2],[0.4,0.6]] + transition_tensor = [[[0.5,0.2],[0.1,0.4]]] + expected = 0.3 + assert round(IntrusionRecoveryPomdpUtil.p_o_given_b_a1_a2(o,b,a,states,transition_tensor,observation_tensor),1) == expected + + def test_next_belief(self) -> None: + """ + Tests the funtion of computing the next belief using a Bayesian filter + + :return: None + """ + o = 0 + a = 0 + b = [0.2,0.8] + states = [0,1] + observations = [0,1] + observation_tensor = [[0.8,0.2],[0.4,0.6]] + transition_tensor = [[[0.5,0.2],[0.1,0.4]]] + assert sum(IntrusionRecoveryPomdpUtil.next_belief(o,a,b,states,observations,observation_tensor,transition_tensor)) == 1 + + def test_pomdp_solver_file(self) -> None: + """ + Tests the function of getting the POMDP environment specification + + :return: None + """ + assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig) is not None \ No newline at end of file From a1799631aa1ad83676f49f03c89e7c2a53ec2327 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Mon, 6 May 2024 15:55:20 +0200 Subject: [PATCH 07/16] add intrusion_recovery_pomdp_util tests --- .../csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 69482569d..2c639805a 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -208,4 +208,5 @@ def test_pomdp_solver_file(self) -> None: :return: None """ - assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig) is not None \ No newline at end of file + assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig) is not None + \ No newline at end of file From da2b1ffa7ecb19218353f92672d59e7c3886368d Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Tue, 7 May 2024 11:58:42 +0200 Subject: [PATCH 08/16] recovery_tests --- .../test_intrusion_recovery_pomdp_util.py | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 2c639805a..eec699509 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -107,10 +107,10 @@ def test_transition_function(self) -> None: s = 0 s_prime = 1 a = 0 - p_a = 0.3 - p_c_1 = 0.3 - p_c_2 = 0.5 - p_u = 0.2 + p_a = 0.2 + p_c_1 = 0.1 + p_c_2 = 0.2 + p_u = 0.5 assert round(IntrusionRecoveryPomdpUtil.transition_function(s,s_prime,a,p_a,p_c_1,p_c_2,p_u),1) == 0.2 def test_transition_tensor(self) -> None: @@ -121,11 +121,11 @@ def test_transition_tensor(self) -> None: """ states = [0,1] actions = [0] - p_a = 0.3 - p_c_1 = 0.3 - p_c_2 = 0.5 - p_u = 0.2 - expected = [[[0.5,0.2],[0.1,0.4]]] + p_a = 0.2 + p_c_1 = 0.1 + p_c_2 = 0.2 + p_u = 0.5 + expected = [[[0.7,0.2],[0.4,0.4]]] transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor(states,actions,p_a,p_c_1,p_c_2,p_u) for i in range(len(transition_tensor)): for j in range(len(transition_tensor[i])): @@ -168,8 +168,8 @@ def test_bayes_filter(self) -> None: states = [0,1] observations = [0,1] observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.5,0.2],[0.1,0.4]]] - b_prime_s_prime = 0.5 + transition_tensor = [[[0.6,0.4],[0.1,0.9]]] + b_prime_s_prime = 0.7 assert round(IntrusionRecoveryPomdpUtil.bayes_filter(s_prime,o,a,b,states,observations,observation_tensor,transition_tensor),1) == b_prime_s_prime def test_p_o_given_b_a1_a2(self) -> None: @@ -183,8 +183,8 @@ def test_p_o_given_b_a1_a2(self) -> None: a = 0 states = [0,1] observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.5,0.2],[0.1,0.4]]] - expected = 0.3 + transition_tensor = [[[0.6,0.4],[0.1,0.9]]] + expected = 0.5 assert round(IntrusionRecoveryPomdpUtil.p_o_given_b_a1_a2(o,b,a,states,transition_tensor,observation_tensor),1) == expected def test_next_belief(self) -> None: @@ -199,8 +199,8 @@ def test_next_belief(self) -> None: states = [0,1] observations = [0,1] observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.5,0.2],[0.1,0.4]]] - assert sum(IntrusionRecoveryPomdpUtil.next_belief(o,a,b,states,observations,observation_tensor,transition_tensor)) == 1 + transition_tensor = [[[0.3,0.7],[0.6,0.4]]] + assert round(sum(IntrusionRecoveryPomdpUtil.next_belief(o,a,b,states,observations,observation_tensor,transition_tensor)),1) == 1 def test_pomdp_solver_file(self) -> None: """ @@ -208,5 +208,7 @@ def test_pomdp_solver_file(self) -> None: :return: None """ - assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig) is not None - \ No newline at end of file + + assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig(eta=0.1,p_a=0.4,p_c_1=0.3,p_c_2=0.3,p_u=0.5,BTR=1,negate_costs=True,seed=1,discount_factor = 0.5, + states=[0,1],actions=[0],observations=[0,1],cost_tensor=[[0.1,0.5],[0.5,0.6]],observation_tensor = [[0.8,0.2],[0.4,0.6]], + transition_tensor = [[[0.8,0.2],[0.6,0.4]]],b1=[0.3,0.7],T=3,simulation_env_name="env",gym_env_name="gym",max_horizon=np.inf)) is not None From e5304b548529e8d973c3fd19d0894e1d446a0bb9 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 8 May 2024 13:03:26 +0200 Subject: [PATCH 09/16] test_general_util --- .../csle-tolerance/tests/test_general_util.py | 60 +++++++++++++++++++ .../test_intrusion_recovery_pomdp_util.py | 2 +- 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_general_util.py diff --git a/simulation-system/libs/csle-tolerance/tests/test_general_util.py b/simulation-system/libs/csle-tolerance/tests/test_general_util.py new file mode 100644 index 000000000..622c91272 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_general_util.py @@ -0,0 +1,60 @@ +from csle_tolerance.util.general_util import GeneralUtil +import math +import numpy as np +class TestIntrusionToleranceGeneralSuite: + """ + Test suite for general_util.py + """ + + def test_threshold_probability(self) -> None: + """ + Tests the functuon of returning the probability of taking an action given a belief and a threshold + + :return: None + """ + b1 = 0.5 + threshold = 0.3 + expected = 1 + assert round(GeneralUtil.threshold_probability(b1,threshold,-20) == math.pow(1 + math.pow(((b1 * (1 - threshold)) / (threshold * (1 - b1))), -20), -1),1) == expected + + def test_sigmoid(self) -> None: + """ + Tests the sigmoid function + + :return: None + """ + x = 1 + expected = 0.7 + assert round(GeneralUtil.sigmoid(x),1) == expected + + def test_inverse_sigmoid(self) -> None: + """ + Tests the inverse sigmoid function + + :return: None + """ + x = 0.1 + expected = -2.2 + assert round(GeneralUtil.inverse_sigmoid(x),1) == expected + + def test_sample_next_state(self) -> None: + """ + Tests the function of sampling the next state of a MDP or POMDP + + :return: None + """ + transition_tensor = [[[0.6,0.4],[0.4,0.6]]] + s = 0 + a = 0 + states = [0,1] + state_probs = [0.6,0.4] + expected = np.arange(0,len(states)) + assert GeneralUtil.sample_next_state(transition_tensor,s,a,states) in expected + + def test_register_envs(self) -> None: + """ + Tests the utility method for registering Gymnasum environments + + :return: None + """ + pass diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index eec699509..0bd726a32 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -209,6 +209,6 @@ def test_pomdp_solver_file(self) -> None: :return: None """ - assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig(eta=0.1,p_a=0.4,p_c_1=0.3,p_c_2=0.3,p_u=0.5,BTR=1,negate_costs=True,seed=1,discount_factor = 0.5, + assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig(eta=0.1,p_a=0.2,p_c_1=0.2,p_c_2=0.3,p_u=0.3,BTR=1,negate_costs=True,seed=1,discount_factor = 0.5, states=[0,1],actions=[0],observations=[0,1],cost_tensor=[[0.1,0.5],[0.5,0.6]],observation_tensor = [[0.8,0.2],[0.4,0.6]], transition_tensor = [[[0.8,0.2],[0.6,0.4]]],b1=[0.3,0.7],T=3,simulation_env_name="env",gym_env_name="gym",max_horizon=np.inf)) is not None From b09587a893f851cc8ba8f0aa8240ef71ab063357 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Tue, 14 May 2024 15:01:26 +0200 Subject: [PATCH 10/16] add tests --- .../csle-tolerance/tests/test_general_util.py | 35 +- .../test_intrusion_recovery_pomdp_env.py | 357 ++++++++++++++++++ .../test_intrusion_recovery_pomdp_util.py | 197 +++++++--- .../test_intrusion_response_cmdp_util.py | 89 +++-- .../test_intrusion_response_pomdp_env.py | 326 ++++++++++++++++ .../tests/test_pomdp_solve_parser.py | 54 +++ 6 files changed, 956 insertions(+), 102 deletions(-) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_response_pomdp_env.py create mode 100644 simulation-system/libs/csle-tolerance/tests/test_pomdp_solve_parser.py diff --git a/simulation-system/libs/csle-tolerance/tests/test_general_util.py b/simulation-system/libs/csle-tolerance/tests/test_general_util.py index 622c91272..96c3094a4 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_general_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_general_util.py @@ -1,6 +1,8 @@ from csle_tolerance.util.general_util import GeneralUtil -import math import numpy as np +import pytest_mock + + class TestIntrusionToleranceGeneralSuite: """ Test suite for general_util.py @@ -15,7 +17,9 @@ def test_threshold_probability(self) -> None: b1 = 0.5 threshold = 0.3 expected = 1 - assert round(GeneralUtil.threshold_probability(b1,threshold,-20) == math.pow(1 + math.pow(((b1 * (1 - threshold)) / (threshold * (1 - b1))), -20), -1),1) == expected + assert ( + round(GeneralUtil.threshold_probability(b1, threshold, -20), 1) == expected + ) def test_sigmoid(self) -> None: """ @@ -25,7 +29,7 @@ def test_sigmoid(self) -> None: """ x = 1 expected = 0.7 - assert round(GeneralUtil.sigmoid(x),1) == expected + assert round(GeneralUtil.sigmoid(x), 1) == expected def test_inverse_sigmoid(self) -> None: """ @@ -35,7 +39,7 @@ def test_inverse_sigmoid(self) -> None: """ x = 0.1 expected = -2.2 - assert round(GeneralUtil.inverse_sigmoid(x),1) == expected + assert round(GeneralUtil.inverse_sigmoid(x), 1) == expected def test_sample_next_state(self) -> None: """ @@ -43,18 +47,25 @@ def test_sample_next_state(self) -> None: :return: None """ - transition_tensor = [[[0.6,0.4],[0.4,0.6]]] + transition_tensor = [[[0.6, 0.4], [0.4, 0.6]]] s = 0 a = 0 - states = [0,1] - state_probs = [0.6,0.4] - expected = np.arange(0,len(states)) - assert GeneralUtil.sample_next_state(transition_tensor,s,a,states) in expected - - def test_register_envs(self) -> None: + states = [0, 1] + expected = np.arange(0, len(states)) + assert ( + GeneralUtil.sample_next_state(transition_tensor, s, a, states) in expected + ) + + def test_register_envs(self, mocker: pytest_mock.MockFixture) -> None: """ Tests the utility method for registering Gymnasum environments :return: None """ - pass + mocked_register = mocker.MagicMock() + mocker.patch( + "gymnasium.envs.registration.register", return_value=mocked_register + ) + mocked_register.configure_mock(**{"__enter__.return_value": None}) + return_value = GeneralUtil.register_envs() + assert return_value is None diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py new file mode 100644 index 000000000..4f8f223d0 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_env.py @@ -0,0 +1,357 @@ +from unittest.mock import patch, Mock +from csle_tolerance.envs.intrusion_recovery_pomdp_env import IntrusionRecoveryPomdpEnv +from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( + IntrusionRecoveryPomdpConfig, +) +import csle_tolerance.constants.constants as env_constants +import csle_common.constants.constants as constants +import numpy as np +import pytest + + +class TestInstrusionRecoveryPomdpEnvSuite: + """ + Test suite for intrusion_recovery_pomdp_env.py + """ + + def test__init__(self) -> None: + """ + Tests the function of initializing the environment + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + assert IntrusionRecoveryPomdpEnv(config).t == 1 + assert IntrusionRecoveryPomdpEnv(config).s == 0 + assert IntrusionRecoveryPomdpEnv(config).o == 0 + + def test_step(self) -> None: + """ + Tests the step function (Takes a step in the environment + by executing the given action) + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + a = 0 + c = 0.1 + + assert IntrusionRecoveryPomdpEnv(config).step(a)[0][0] == 2 + assert IntrusionRecoveryPomdpEnv(config).step(a)[1] == c + assert ( + IntrusionRecoveryPomdpEnv(config).step(a)[4][ + env_constants.ENV_METRICS.DEFENDER_ACTION + ] + == 0 + ) + assert ( + IntrusionRecoveryPomdpEnv(config).step(a)[4][ + env_constants.ENV_METRICS.TIME_STEP + ] + == 2 + ) + + def test_reset(self) -> None: + """ + Tests the reset function (Resets the environment state) + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + assert IntrusionRecoveryPomdpEnv(config).reset()[0][0] == 1 + assert IntrusionRecoveryPomdpEnv(config).reset()[0][1] == 0 + assert IntrusionRecoveryPomdpEnv(config).reset()[0][2] == 0 + + def test_info(self) -> None: + """ + Tests the function of adding the cumulative reward and episode length to the info dict + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + info = {} + with pytest.raises(IndexError): + assert IntrusionRecoveryPomdpEnv(config)._info(info) is not None + + def test_render(self) -> None: + """ + Tests the function of rendering the environment. + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + with pytest.raises(NotImplementedError): + IntrusionRecoveryPomdpEnv(config).render() + + def test_get_traces(self) -> None: + """ + Tests the function of getting the list of simulation traces + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + returned_traces = IntrusionRecoveryPomdpEnv(config).get_traces() + assert not returned_traces + + def test_reset_traces(self) -> None: + """ + Tests the function of reseting the list of traces + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + assert IntrusionRecoveryPomdpEnv(config).reset_traces() is None + + @patch("time.time") # Mock the time.time function + @patch( + "csle_common.dao.simulation_config.simulation_trace.SimulationTrace.save_traces" + ) # Mock the method + def test_checkpoint_traces(self, mock_save_traces, mock_time) -> None: + """ + Tests the function of checkpointing agent traces + + :return: None + """ + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + mock_time.return_value = 1234567890 + environment = IntrusionRecoveryPomdpEnv(config) + environment._IntrusionRecoveryPomdpEnv__checkpoint_traces() + mock_save_traces.assert_called_once_with( + traces_save_dir=constants.LOGGING.DEFAULT_LOG_DIR, + traces=environment.traces, + traces_file=f"taus{mock_time.return_value}.json", + ) + + def test_set_model(self) -> None: + """ + Tests the function of setting the model + + :return: None + """ + mock_model = Mock() + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + environment = IntrusionRecoveryPomdpEnv(config) + environment.set_model(mock_model) + assert environment.model == mock_model + + def test_set_state(self) -> None: + """ + Tests the function of setting the state + + :return: None + """ + mock_state = Mock() + config = IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[1, 0], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + environment = IntrusionRecoveryPomdpEnv(config) + environment.set_state(mock_state) + assert environment.s == mock_state diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py index 0bd726a32..87a272aff 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_util.py @@ -1,6 +1,10 @@ from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil -from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig +from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( + IntrusionRecoveryPomdpConfig, +) import numpy as np +import pytest + class TestIntrusionTolerancePomdpSuite: """ @@ -13,9 +17,11 @@ def test__state_space(self) -> None: :return: None """ - assert (isinstance(item,int) for item in IntrusionRecoveryPomdpUtil.state_space()) + assert ( + isinstance(item, int) for item in IntrusionRecoveryPomdpUtil.state_space() + ) assert IntrusionRecoveryPomdpUtil.state_space() is not None - assert IntrusionRecoveryPomdpUtil.state_space() == [0,1,2] + assert IntrusionRecoveryPomdpUtil.state_space() == [0, 1, 2] def test_initial_belief(self) -> None: """ @@ -31,9 +37,11 @@ def test_action_space(self) -> None: :return: None """ - assert (isinstance(item,int) for item in IntrusionRecoveryPomdpUtil.action_space()) + assert ( + isinstance(item, int) for item in IntrusionRecoveryPomdpUtil.action_space() + ) assert IntrusionRecoveryPomdpUtil.action_space() is not None - assert IntrusionRecoveryPomdpUtil.action_space() == [0,1] + assert IntrusionRecoveryPomdpUtil.action_space() == [0, 1] def test_observation_space(self) -> None: """ @@ -42,7 +50,7 @@ def test_observation_space(self) -> None: :return: None """ num_observation = 3 - expected = [0,1,2] + expected = [0, 1, 2] assert IntrusionRecoveryPomdpUtil.observation_space(num_observation) == expected def test_cost_function(self) -> None: @@ -55,20 +63,23 @@ def test_cost_function(self) -> None: a = 0 eta = 0.5 negate = False - assert IntrusionRecoveryPomdpUtil.cost_function(s,a,eta,negate) == 0.5 + assert IntrusionRecoveryPomdpUtil.cost_function(s, a, eta, negate) == 0.5 def test_cost_tensor(self) -> None: """ Tests the function of creating a tensor with the costs of the POMDP - + :return: None """ eta = 0.5 - states = [0,1] + states = [0, 1] actions = [0] negate = False - expected = [[0,0.5]] - assert IntrusionRecoveryPomdpUtil.cost_tensor(eta,states,actions,negate) == expected + expected = [[0, 0.5]] + assert ( + IntrusionRecoveryPomdpUtil.cost_tensor(eta, states, actions, negate) + == expected + ) def test_observation_function(self) -> None: """ @@ -79,8 +90,9 @@ def test_observation_function(self) -> None: s = 1 o = 1 num_observations = 2 - expected = 0.6 - assert round(IntrusionRecoveryPomdpUtil.observation_function(s,o,num_observations),1) + assert round( + IntrusionRecoveryPomdpUtil.observation_function(s, o, num_observations), 1 + ) def test_observation_tensor(self) -> None: """ @@ -88,13 +100,13 @@ def test_observation_tensor(self) -> None: :return: None """ - states = [0,1] - observations = [0,1] - expected = [[0.8,0.2],[0.4,0.6]] - obs_tensor = IntrusionRecoveryPomdpUtil.observation_tensor(states,observations) + states = [0, 1] + observations = [0, 1] + expected = [[0.8, 0.2], [0.4, 0.6]] + obs_tensor = IntrusionRecoveryPomdpUtil.observation_tensor(states, observations) for i in range(len(obs_tensor)): for j in range(len(obs_tensor[i])): - obs_tensor[i][j] = round(obs_tensor[i][j],1) + obs_tensor[i][j] = round(obs_tensor[i][j], 1) assert sum(obs_tensor[i]) == 1 assert obs_tensor == expected @@ -111,7 +123,15 @@ def test_transition_function(self) -> None: p_c_1 = 0.1 p_c_2 = 0.2 p_u = 0.5 - assert round(IntrusionRecoveryPomdpUtil.transition_function(s,s_prime,a,p_a,p_c_1,p_c_2,p_u),1) == 0.2 + assert ( + round( + IntrusionRecoveryPomdpUtil.transition_function( + s, s_prime, a, p_a, p_c_1, p_c_2, p_u + ), + 1, + ) + == 0.2 + ) def test_transition_tensor(self) -> None: """ @@ -119,19 +139,26 @@ def test_transition_tensor(self) -> None: :return: None """ - states = [0,1] + states = [0, 1, 2] actions = [0] p_a = 0.2 p_c_1 = 0.1 p_c_2 = 0.2 p_u = 0.5 - expected = [[[0.7,0.2],[0.4,0.4]]] - transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor(states,actions,p_a,p_c_1,p_c_2,p_u) + expected = [[[0.7, 0.2, 0.1], [0.4, 0.4, 0.2], [0, 0, 1.0]]] + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor( + states, actions, p_a, p_c_1, p_c_2, p_u + ) for i in range(len(transition_tensor)): for j in range(len(transition_tensor[i])): for k in range(len(transition_tensor[i][j])): - transition_tensor[i][j][k] = round(transition_tensor[i][j][k],1) + transition_tensor[i][j][k] = round(transition_tensor[i][j][k], 1) assert transition_tensor == expected + states = [0, 1] + with pytest.raises(AssertionError): + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor( + states, actions, p_a, p_c_1, p_c_2, p_u + ) def test_sample_initial_state(self) -> None: """ @@ -139,10 +166,14 @@ def test_sample_initial_state(self) -> None: :return: None """ - b1 = [0.2,0.8] - assert isinstance(IntrusionRecoveryPomdpUtil.sample_initial_state(b1),int) + b1 = [0.2, 0.8] + assert isinstance(IntrusionRecoveryPomdpUtil.sample_initial_state(b1), int) assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) <= len(b1) assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) >= 0 + b1 = [1.0, 0] + assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) == 0 + b1 = [0.0, 1.0] + assert IntrusionRecoveryPomdpUtil.sample_initial_state(b1) == 1 def test_sampe_next_observation(self) -> None: """ @@ -150,10 +181,15 @@ def test_sampe_next_observation(self) -> None: :return: None """ - observation_tensor = [[0.8,0.2],[0.4,0.6]] + observation_tensor = [[0.8, 0.2], [0.4, 0.6]] s_prime = 1 - observations = [0,1] - assert isinstance(IntrusionRecoveryPomdpUtil.sample_next_observation(observation_tensor,s_prime,observations),int) + observations = [0, 1] + assert isinstance( + IntrusionRecoveryPomdpUtil.sample_next_observation( + observation_tensor, s_prime, observations + ), + int, + ) def test_bayes_filter(self) -> None: """ @@ -164,13 +200,28 @@ def test_bayes_filter(self) -> None: s_prime = 1 o = 0 a = 0 - b = [0.2,0.8] - states = [0,1] - observations = [0,1] - observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.6,0.4],[0.1,0.9]]] + b = [0.2, 0.8] + states = [0, 1] + observations = [0, 1] + observation_tensor = [[0.8, 0.2], [0.4, 0.6]] + transition_tensor = [[[0.6, 0.4], [0.1, 0.9]]] b_prime_s_prime = 0.7 - assert round(IntrusionRecoveryPomdpUtil.bayes_filter(s_prime,o,a,b,states,observations,observation_tensor,transition_tensor),1) == b_prime_s_prime + assert ( + round( + IntrusionRecoveryPomdpUtil.bayes_filter( + s_prime, + o, + a, + b, + states, + observations, + observation_tensor, + transition_tensor, + ), + 1, + ) + == b_prime_s_prime + ) def test_p_o_given_b_a1_a2(self) -> None: """ @@ -179,13 +230,21 @@ def test_p_o_given_b_a1_a2(self) -> None: :return: None """ o = 0 - b = [0.2,0.8] + b = [0.2, 0.8] a = 0 - states = [0,1] - observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.6,0.4],[0.1,0.9]]] + states = [0, 1] + observation_tensor = [[0.8, 0.2], [0.4, 0.6]] + transition_tensor = [[[0.6, 0.4], [0.1, 0.9]]] expected = 0.5 - assert round(IntrusionRecoveryPomdpUtil.p_o_given_b_a1_a2(o,b,a,states,transition_tensor,observation_tensor),1) == expected + assert ( + round( + IntrusionRecoveryPomdpUtil.p_o_given_b_a1_a2( + o, b, a, states, transition_tensor, observation_tensor + ), + 1, + ) + == expected + ) def test_next_belief(self) -> None: """ @@ -195,12 +254,28 @@ def test_next_belief(self) -> None: """ o = 0 a = 0 - b = [0.2,0.8] - states = [0,1] - observations = [0,1] - observation_tensor = [[0.8,0.2],[0.4,0.6]] - transition_tensor = [[[0.3,0.7],[0.6,0.4]]] - assert round(sum(IntrusionRecoveryPomdpUtil.next_belief(o,a,b,states,observations,observation_tensor,transition_tensor)),1) == 1 + b = [0.2, 0.8] + states = [0, 1] + observations = [0, 1] + observation_tensor = [[0.8, 0.2], [0.4, 0.6]] + transition_tensor = [[[0.3, 0.7], [0.6, 0.4]]] + assert ( + round( + sum( + IntrusionRecoveryPomdpUtil.next_belief( + o, + a, + b, + states, + observations, + observation_tensor, + transition_tensor, + ) + ), + 1, + ) + == 1 + ) def test_pomdp_solver_file(self) -> None: """ @@ -208,7 +283,31 @@ def test_pomdp_solver_file(self) -> None: :return: None """ - - assert IntrusionRecoveryPomdpUtil.pomdp_solver_file(IntrusionRecoveryPomdpConfig(eta=0.1,p_a=0.2,p_c_1=0.2,p_c_2=0.3,p_u=0.3,BTR=1,negate_costs=True,seed=1,discount_factor = 0.5, - states=[0,1],actions=[0],observations=[0,1],cost_tensor=[[0.1,0.5],[0.5,0.6]],observation_tensor = [[0.8,0.2],[0.4,0.6]], - transition_tensor = [[[0.8,0.2],[0.6,0.4]]],b1=[0.3,0.7],T=3,simulation_env_name="env",gym_env_name="gym",max_horizon=np.inf)) is not None + + assert ( + IntrusionRecoveryPomdpUtil.pomdp_solver_file( + IntrusionRecoveryPomdpConfig( + eta=0.1, + p_a=0.2, + p_c_1=0.2, + p_c_2=0.3, + p_u=0.3, + BTR=1, + negate_costs=True, + seed=1, + discount_factor=0.5, + states=[0, 1], + actions=[0], + observations=[0, 1], + cost_tensor=[[0.1, 0.5], [0.5, 0.6]], + observation_tensor=[[0.8, 0.2], [0.4, 0.6]], + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + b1=[0.3, 0.7], + T=3, + simulation_env_name="env", + gym_env_name="gym", + max_horizon=np.inf, + ) + ) + is not None + ) diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py index 86998ca70..06f3043e0 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_util.py @@ -5,7 +5,7 @@ class TestInstrusionToleranceCmdpSuite: """ Test suite for intrusion_response_cmdp_util.py """ - + def test_state_space(self) -> None: """ Tests the state space of the CMDP @@ -13,17 +13,17 @@ def test_state_space(self) -> None: :return: None """ assert len(IntrusionResponseCmdpUtil.state_space(2)) != 0 - assert IntrusionResponseCmdpUtil.state_space(1) is not None + assert IntrusionResponseCmdpUtil.state_space(1) is not None assert IntrusionResponseCmdpUtil.state_space(0) == [0] - assert all(isinstance(n,int) for n in IntrusionResponseCmdpUtil.state_space(3)) - + assert all(isinstance(n, int) for n in IntrusionResponseCmdpUtil.state_space(3)) + def test_action_space(self) -> None: """ Tests the action space of the CMDP :return: None """ - assert IntrusionResponseCmdpUtil.action_space() == [0,1] + assert IntrusionResponseCmdpUtil.action_space() == [0, 1] def test_cost_function(self) -> None: """ @@ -32,10 +32,10 @@ def test_cost_function(self) -> None: :return: None """ s = 1 - assert IntrusionResponseCmdpUtil.cost_function(s,True) == -1.0 - assert IntrusionResponseCmdpUtil.cost_function(s,False) == 1.0 - assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,False),float) - assert isinstance(IntrusionResponseCmdpUtil.cost_function(s,True),float) + assert IntrusionResponseCmdpUtil.cost_function(s, True) == -1.0 + assert IntrusionResponseCmdpUtil.cost_function(s, False) == 1.0 + assert isinstance(IntrusionResponseCmdpUtil.cost_function(s, False), float) + assert isinstance(IntrusionResponseCmdpUtil.cost_function(s, True), float) def test_cost_tensor(self) -> None: """ @@ -43,10 +43,12 @@ def test_cost_tensor(self) -> None: :return: None """ - states = [0,1,2,3] - expected = [-0.0,-1.0,-2.0,-3.0] - assert IntrusionResponseCmdpUtil.cost_tensor(states,True) == expected - assert IntrusionResponseCmdpUtil.cost_tensor(states,False) == [-item for item in expected] + states = [0, 1, 2, 3] + expected = [-0.0, -1.0, -2.0, -3.0] + assert IntrusionResponseCmdpUtil.cost_tensor(states, True) == expected + assert IntrusionResponseCmdpUtil.cost_tensor(states, False) == [ + -item for item in expected + ] def test_constraint_cost_function(self) -> None: """ @@ -54,22 +56,28 @@ def test_constraint_cost_function(self) -> None: :return: None """ - states = [1,2] + states = [1, 2] f = 0.5 - expected = [0.0,1.0] - assert IntrusionResponseCmdpUtil.constraint_cost_function(states[0],f) == expected[0] - assert IntrusionResponseCmdpUtil.constraint_cost_function(states[1],f) == expected[1] + expected = [0.0, 1.0] + assert ( + IntrusionResponseCmdpUtil.constraint_cost_function(states[0], f) + == expected[0] + ) + assert ( + IntrusionResponseCmdpUtil.constraint_cost_function(states[1], f) + == expected[1] + ) def test_constraint_cost_tensor(self) -> None: """ Tests the function of creating a tensor with the constrained costs of the CMDP - :return: None + :return: None """ - states = [1,2] + states = [1, 2] f = 0.5 - expected = [0.0,1.0] - assert IntrusionResponseCmdpUtil.constraint_cost_tensor(states,f) == expected + expected = [0.0, 1.0] + assert IntrusionResponseCmdpUtil.constraint_cost_tensor(states, f) == expected def test_delta_function(self) -> None: """ @@ -84,7 +92,15 @@ def test_delta_function(self) -> None: p_u = 0.7 delta = 1 expected = 1.35 - assert round(IntrusionResponseCmdpUtil.delta_function(s,p_a,p_c,p_u,delta,s_max),2) == expected + assert ( + round( + IntrusionResponseCmdpUtil.delta_function( + s, p_a, p_c, p_u, delta, s_max + ), + 2, + ) + == expected + ) def test_transition_function(self) -> None: """ @@ -100,7 +116,10 @@ def test_transition_function(self) -> None: p_c = 0.3 p_u = 0.5 delta = s_prime - a - s - assert IntrusionResponseCmdpUtil.delta_function(s,p_a,p_c,p_u,delta,s_max) == 3 + assert ( + IntrusionResponseCmdpUtil.delta_function(s, p_a, p_c, p_u, delta, s_max) + == 3 + ) def test_transition_tensor(self) -> None: """ @@ -108,30 +127,18 @@ def test_transition_tensor(self) -> None: :return: None """ - states = [0,1] + states = [0, 1] actions = [0] s_max = 2 p_a = 0.2 p_c = 0.3 p_u = 0.5 - expected = [[[3/5,2/5],[2/5,3/5]]] - transition_tensor = IntrusionResponseCmdpUtil.transition_tensor(states,actions,p_a,p_c,p_u,s_max) + expected = [[[3 / 5, 2 / 5], [2 / 5, 3 / 5]]] + transition_tensor = IntrusionResponseCmdpUtil.transition_tensor( + states, actions, p_a, p_c, p_u, s_max + ) for i in range(len(transition_tensor)): for j in range(len(transition_tensor[i])): for k in range(len(transition_tensor[i][j])): - transition_tensor[i][j][k] = round(transition_tensor[i][j][k],1) + transition_tensor[i][j][k] = round(transition_tensor[i][j][k], 1) assert transition_tensor == expected - - - - - - - - - - - - - - diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_pomdp_env.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_pomdp_env.py new file mode 100644 index 000000000..8751fb423 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_pomdp_env.py @@ -0,0 +1,326 @@ +from csle_tolerance.dao.intrusion_response_cmdp_config import ( + IntrusionResponseCmdpConfig, +) +from csle_tolerance.envs.intrusion_response_cmdp_env import IntrusionResponseCmdpEnv +from unittest.mock import patch, Mock +import csle_tolerance.constants.constants as env_constants +import csle_common.constants.constants as constants +import pytest + + +class TestInstrusionResponseCmdpEnvSuite: + """ + Test suite for intrusion_response_pomdp_env.py + """ + + def test__init__(self) -> None: + """ + Tests the function of initializing the environment + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + assert IntrusionResponseCmdpEnv(env).t == 1 + assert IntrusionResponseCmdpEnv(env).s == env.initial_state + assert IntrusionResponseCmdpEnv(env).traces == [] + + def test_step(self) -> None: + """ + Tests the step function (Takes a step in the environment + by executing the given action) + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + a = 0 + c = 0.1 + assert IntrusionResponseCmdpEnv(env).step(a)[1] == c + assert not IntrusionResponseCmdpEnv(env).step(a)[2] + assert ( + IntrusionResponseCmdpEnv(env).step(a)[4][ + env_constants.ENV_METRICS.TIME_STEP + ] + == 2 + ) + assert ( + IntrusionResponseCmdpEnv(env).step(a)[4][ + env_constants.ENV_METRICS.DEFENDER_ACTION + ] + == a + ) + + def test_reset(self) -> None: + """ + Tests the reset function (Resets the environment state) + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + assert IntrusionResponseCmdpEnv(env).reset()[0] == 0 + assert not IntrusionResponseCmdpEnv(env).reset()[1] + + def test_info(self) -> None: + """ + Tests the function of adding the cumulative reward and episode length to the info dict + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + info = {} + assert IntrusionResponseCmdpEnv(env)._info(info) is not None + + def test_render(self) -> None: + """ + Tests the funtion of rendering the environment + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + with pytest.raises(NotImplementedError): + IntrusionResponseCmdpEnv(env).render() + + def test_get_traces(self) -> None: + """ + Tests the function of getting the list of simulation traces + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + returned_traces = IntrusionResponseCmdpEnv(env).get_traces() + assert not returned_traces + + def test_reset_traces(self) -> None: + """ + Tests the function of reseting the list of traces + + :return: None + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + assert not IntrusionResponseCmdpEnv(env).reset_traces() + + @patch("time.time") # Mock the time.time function + @patch( + "csle_common.dao.simulation_config.simulation_trace.SimulationTrace.save_traces" + ) # Mock the method + def test_checkpoint_traces(self, mock_save_traces, mock_time) -> None: + """ + Tests the function of checkpointing agent traces + + :param mock_save_traces: _description_ + :type mock_save_traces: _type_ + :param mock_time: _description_ + :type mock_time: _type_ + """ + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + mock_time.return_value = 1234567890 + environment = IntrusionResponseCmdpEnv(env) + environment._IntrusionResponseCmdpEnv__checkpoint_traces() + mock_save_traces.assert_called_once_with( + traces_save_dir=constants.LOGGING.DEFAULT_LOG_DIR, + traces=environment.traces, + traces_file=f"taus{mock_time.return_value}.json", + ) + + def test_set_model(self) -> None: + """ + Tests the function of setting the model + + :return: None + """ + mock_model = Mock() + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + environment = IntrusionResponseCmdpEnv(env) + environment.set_model(mock_model) + assert environment.model == mock_model + + def test_set_state(self) -> None: + """ + Tests the function of setting the state + + :return: None + """ + mock_state = Mock() + env = IntrusionResponseCmdpConfig( + p_a=0.2, + p_c=0.5, + p_u=0.3, + s_max=2, + transition_tensor=[[[0.8, 0.2], [0.6, 0.4]]], + cost_tensor=[0.1, 0.5], + negate_costs=True, + seed=1, + states=[0, 1], + actions=[0], + initial_state=0, + constraint_cost_tensor=[0.2, 0.7], + f=1, + epsilon_a=0.3, + simulation_env_name="env", + gym_env_name="gym", + discount_factor=0.5, + ) + environment = IntrusionResponseCmdpEnv(env) + environment.set_state(mock_state) + assert environment.s == mock_state diff --git a/simulation-system/libs/csle-tolerance/tests/test_pomdp_solve_parser.py b/simulation-system/libs/csle-tolerance/tests/test_pomdp_solve_parser.py new file mode 100644 index 000000000..23d6a41d7 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_pomdp_solve_parser.py @@ -0,0 +1,54 @@ +from csle_tolerance.util.pomdp_solve_parser import PomdpSolveParser +import pytest +import tempfile +import numpy as np + + +class TestInstrusionTolerancePomdpSolveParserSuite: + """ + Test suite for pomdp_solve_parser.py + """ + + def test_parse_alpha_vectors(self) -> None: + """ + Tests the function of parsing alpha vectors from a given file location + + :return: None + """ + # test for a scenario where the file does not exist + with pytest.raises(FileNotFoundError): + file_path = "file.txt" + PomdpSolveParser.parse_alpha_vectors(file_path) + # test case for correct parsing + file_content = "0\n0.1 0.2 0.3\n1\n0.4 0.5 0.6" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(file_content) + f.flush() + file_path = f.name + result = PomdpSolveParser.parse_alpha_vectors(file_path) + expected = [(0, [0.1, 0.2, 0.3]), (1, [0.4, 0.5, 0.6])] + assert result == expected + # test case for incorrect file format + file_content = "Invalid content" + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write(file_content) + f.flush() + file_path = f.name + result = PomdpSolveParser.parse_alpha_vectors(file_path) + assert result == [] + + def test_optimal_avg_value(self) -> None: + """ + Tests the function of computing the optimal average value given a set of alpha vectors + + :return: None + """ + alpha_vectors_content = "0\n0.1 0.2 0.3\n1\n0.4 0.5 0.6" + initial_belief = 0.5 + btr = 10 + with tempfile.NamedTemporaryFile(mode="w") as f: + f.write(alpha_vectors_content) + f.flush() + result = PomdpSolveParser.optimal_avg_value(f.name, initial_belief, btr) + expected = -0.45 / btr + assert np.isclose(result, expected) From fab0bcf85efad0d353cf8832fbf02431c0c9ccc7 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Tue, 14 May 2024 18:08:13 +0200 Subject: [PATCH 11/16] dao tests --- .../test_intrusion_recovery_pomdp_config.py | 265 ++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py new file mode 100644 index 000000000..0e9428958 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_pomdp_config.py @@ -0,0 +1,265 @@ +from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( + IntrusionRecoveryPomdpConfig, +) +from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil +import pytest_mock +import numpy as np + + +class TestIntrusionRecoveryPomdpConfigSuite: + """ + Test suite for intrusion_recovery_pomdp_config.py + """ + + def test__init__(self) -> None: + """ + Tests the function of initializing the DTO + """ + dto = IntrusionRecoveryPomdpConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + p_c_2=0.2, + p_u=0.3, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + assert dto.eta == 0.5 + assert dto.p_a == 0.8 + assert dto.p_c_1 == 0.1 + assert dto.p_c_2 == 0.2 + assert dto.p_u == 0.3 + assert dto.BTR == 10 + assert dto.negate_costs is True + assert dto.seed == 123 + assert dto.discount_factor == 0.9 + assert dto.states == [0, 1, 2] + assert dto.actions == [0, 1] + assert dto.observations == [0, 1] + assert dto.cost_tensor == [[1, 2], [3, 4]] + assert dto.observation_tensor == [[1, 2], [3, 4]] + assert dto.transition_tensor == [ + [[0.1, 0.2], [0.3, 0.4]], + [[0.5, 0.6], [0.7, 0.8]], + ] + assert dto.b1 == [0.1, 0.9] + assert dto.T == 100 + assert dto.simulation_env_name == "sim_env" + assert dto.gym_env_name == "gym_env" + assert dto.max_horizon == 1000 + + def test__str__(self) -> None: + """ + Tests the function of returning a string representation of the DTO + """ + dto = IntrusionRecoveryPomdpConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + p_c_2=0.2, + p_u=0.3, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + expected = ( + "eta: 0.5, p_a: 0.8, p_c_1: 0.1, p_c_2: 0.2, p_u: 0.3, BTR: 10, negate_costs: True, seed: 123, " + "discount_factor: 0.9, states: [0, 1, 2], actions: [0, 1], observations: [[1, 2], [3, 4]], " + "cost_tensor: [[1, 2], [3, 4]], observation_tensor: [[1, 2], [3, 4]], " + "transition_tensor: [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], b1:[0.1, 0.9], " + "T: 100, simulation_env_name: sim_env, gym_env_name: gym_env, max_horizon: 1000" + ) + assert dto.__str__() == expected + + def test_from_dict(self) -> None: + """ + Tests the function of converting a dict representation to an instance + """ + dto_dict = { + "eta": 0.5, + "p_a": 0.8, + "p_c_1": 0.1, + "p_c_2": 0.2, + "p_u": 0.3, + "BTR": 10, + "negate_costs": True, + "seed": 123, + "discount_factor": 0.9, + "states": [0, 1, 2], + "actions": [0, 1], + "observations": [0, 1], + "cost_tensor": [[1, 2], [3, 4]], + "observation_tensor": [[1, 2], [3, 4]], + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "b1": [0.1, 0.9], + "T": 100, + "simulation_env_name": "sim_env", + "gym_env_name": "gym_env", + } + dto = IntrusionRecoveryPomdpConfig.from_dict(dto_dict) + assert dto.eta == 0.5 + assert dto.p_a == 0.8 + assert dto.p_c_1 == 0.1 + assert dto.p_c_2 == 0.2 + assert dto.p_u == 0.3 + assert dto.BTR == 10 + assert dto.negate_costs is True + assert dto.seed == 123 + assert dto.discount_factor == 0.9 + assert dto.states == [0, 1, 2] + assert dto.actions == [0, 1] + assert dto.observations == [0, 1] + assert dto.cost_tensor == [[1, 2], [3, 4]] + assert dto.observation_tensor == [[1, 2], [3, 4]] + assert dto.transition_tensor == [ + [[0.1, 0.2], [0.3, 0.4]], + [[0.5, 0.6], [0.7, 0.8]], + ] + assert dto.b1 == [0.1, 0.9] + assert dto.T == 100 + assert dto.simulation_env_name == "sim_env" + assert dto.gym_env_name == "gym_env" + + def test_to_dict(self) -> None: + """ + Tests the function of getting a dict representation of the object + """ + dto = IntrusionRecoveryPomdpConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + p_c_2=0.2, + p_u=0.3, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + ) + expected = { + "eta": 0.5, + "p_a": 0.8, + "p_c_1": 0.1, + "p_c_2": 0.2, + "p_u": 0.3, + "BTR": 10, + "negate_costs": True, + "seed": 123, + "discount_factor": 0.9, + "states": [0, 1, 2], + "actions": [0, 1], + "observations": [0, 1], + "cost_tensor": [[1, 2], [3, 4]], + "observation_tensor": [[1, 2], [3, 4]], + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "b1": [0.1, 0.9], + "T": 100, + "simulation_env_name": "sim_env", + "gym_env_name": "gym_env", + } + assert dto.to_dict() == expected + + def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: + """ + tests the function of reading a json file and converting it to a DTO + """ + eta = 2 + p_a = 0.05 + p_c_1 = 0.01 + p_c_2 = 0.01 + p_u = 0.0 + BTR = np.inf + negate_costs = False + discount_factor = 1 - p_c_1 + num_observations = 100 + simulation_name = "csle-tolerance-intrusion-recovery-pomdp-defender-001" + cost_tensor = IntrusionRecoveryPomdpUtil.cost_tensor( + eta=eta, + states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + negate=negate_costs, + ) + observation_tensor = IntrusionRecoveryPomdpUtil.observation_tensor( + states=IntrusionRecoveryPomdpUtil.state_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space( + num_observations=num_observations + ), + ) + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor( + states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + p_a=p_a, + p_c_1=p_c_1, + p_c_2=p_c_2, + p_u=p_u, + ) + config = IntrusionRecoveryPomdpConfig( + eta=eta, + p_a=p_a, + p_c_1=p_c_1, + p_c_2=p_c_2, + p_u=p_u, + BTR=BTR, + negate_costs=negate_costs, + seed=999, + discount_factor=discount_factor, + states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space( + num_observations=num_observations + ), + cost_tensor=cost_tensor, + observation_tensor=observation_tensor, + transition_tensor=transition_tensor, + b1=IntrusionRecoveryPomdpUtil.initial_belief(p_a=p_a), + T=BTR, + simulation_env_name=simulation_name, + gym_env_name="csle-tolerance-intrusion-recovery-pomdp-v1", + ) + mocker.patch( + "io.open", side_effect=mocker.mock_open(read_data=config.to_json_str()) + ) + dto = IntrusionRecoveryPomdpConfig.from_json_file("path") + assert dto.eta == 2 + assert dto.p_a == 0.05 + assert dto.p_c_1 == 0.01 + assert dto.p_c_2 == 0.01 + assert dto.p_u == 0.0 + assert dto.BTR == np.inf + assert dto.negate_costs is False + assert dto.seed == 999 + assert dto.discount_factor == 1 - p_c_1 From 85f2f83843c77b687acc19de17fd975d9673cc66 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 15 May 2024 09:55:15 +0200 Subject: [PATCH 12/16] add tests --- .../dao/intrusion_recovery_pomdp_config.py | 9 +- .../envs/intrusion_recovery_pomdp_env.py | 1 + .../src/csle_tolerance/util/general_util.py | 4 +- .../util/intrusion_recovery_pomdp_util.py | 1 + .../test_intrusion_response_cmdp_config.py | 283 ++++++++++++++++++ 5 files changed, 294 insertions(+), 4 deletions(-) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_config.py diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_pomdp_config.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_pomdp_config.py index de0f2aa7b..bec3d32e4 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_pomdp_config.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_pomdp_config.py @@ -1,5 +1,8 @@ from typing import List, Dict, Any import numpy as np +import logging +import io +import json from csle_common.dao.simulation_config.simulation_env_input_config import SimulationEnvInputConfig @@ -111,7 +114,7 @@ def to_dict(self) -> Dict[str, Any]: d["b1"] = self.b1 d["T"] = self.T d["simulation_env_name"] = self.simulation_env_name - d["gym_env_name"] = self.simulation_env_name + d["gym_env_name"] = self.gym_env_name return d @staticmethod @@ -122,8 +125,8 @@ def from_json_file(json_file_path: str) -> "IntrusionRecoveryPomdpConfig": :param json_file_path: the json file path :return: the converted DTO """ - import io - import json + logging.info(msg="msg") with io.open(json_file_path, 'r') as f: json_str = f.read() + print(json_str) return IntrusionRecoveryPomdpConfig.from_dict(json.loads(json_str)) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py index 8c4e983a5..054395d3a 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py @@ -146,6 +146,7 @@ def _info(self, info: Dict[str, Any]) -> Dict[str, Any]: upper_bound_return = 0 s = self.trace.states[0] for i in range(len(self.trace.states)): + print(i) if s == 0 or s == 2: a = 0 else: diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py index ebe1a487e..d5512bae2 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py @@ -1,5 +1,4 @@ from typing import List -from gymnasium.envs.registration import register import math import numpy as np @@ -73,6 +72,9 @@ def register_envs() -> None: :return: None """ + #import logging + from gymnasium.envs.registration import register + #logging.info(f"mocked object?: {register}") register( id='csle-tolerance-intrusion-recovery-pomdp-v1', entry_point='csle_tolerance.envs.intrusion_recovery_pomdp_env:IntrusionRecoveryPomdpEnv', diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py index f66aaced2..4b8fbb9bf 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py @@ -179,6 +179,7 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: :param p_u: the upgrade probability :return: the transition tensor """ + assert states == [0,1,2] transition_tensor = [] for a in actions: a_transitions = [] diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_config.py new file mode 100644 index 000000000..7ed899fbd --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_response_cmdp_config.py @@ -0,0 +1,283 @@ +from csle_tolerance.dao.intrusion_response_cmdp_config import ( + IntrusionResponseCmdpConfig, +) +import pytest_mock + + +class TestIntrusionResponseCmdpConfigSuite: + """ + Test suite for intrusion_response_cmdp_config.py + """ + + def test__init__(self) -> None: + """ + Tests the function of initializing the DTO + """ + p_a = 0.1 + p_c = 0.2 + p_u = 0.3 + s_max = 10 + transition_tensor = [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] + cost_tensor = [0.5, 0.6] + negate_costs = True + seed = 123 + states = [0, 1] + actions = [0, 1] + initial_state = 0 + constraint_cost_tensor = [0.7, 0.8] + f = 5 + epsilon_a = 0.9 + simulation_env_name = "SimulationEnv" + gym_env_name = "GymEnv" + discount_factor = 0.95 + dto = IntrusionResponseCmdpConfig( + p_a, + p_c, + p_u, + s_max, + transition_tensor, + cost_tensor, + negate_costs, + seed, + states, + actions, + initial_state, + constraint_cost_tensor, + f, + epsilon_a, + simulation_env_name, + gym_env_name, + discount_factor, + ) + assert dto.p_a == p_a + assert dto.p_c == p_c + assert dto.p_u == p_u + assert dto.s_max == s_max + assert dto.transition_tensor == transition_tensor + assert dto.cost_tensor == cost_tensor + assert dto.negate_costs == negate_costs + assert dto.seed == seed + assert dto.states == states + assert dto.actions == actions + assert dto.initial_state == initial_state + assert dto.constraint_cost_tensor == constraint_cost_tensor + assert dto.f == f + assert dto.epsilon_a == epsilon_a + assert dto.simulation_env_name == simulation_env_name + assert dto.gym_env_name == gym_env_name + assert dto.discount_factor == discount_factor + + def test__str__(self) -> None: + """ + Tests the function of returning a string representation of the DTO + """ + p_a = 0.1 + p_c = 0.2 + p_u = 0.3 + s_max = 10 + transition_tensor = [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] + cost_tensor = [0.5, 0.6] + negate_costs = True + seed = 123 + states = [0, 1] + actions = [0, 1] + initial_state = 0 + constraint_cost_tensor = [0.7, 0.8] + f = 5 + epsilon_a = 0.9 + simulation_env_name = "SimulationEnv" + gym_env_name = "GymEnv" + discount_factor = 0.95 + dto = IntrusionResponseCmdpConfig( + p_a, + p_c, + p_u, + s_max, + transition_tensor, + cost_tensor, + negate_costs, + seed, + states, + actions, + initial_state, + constraint_cost_tensor, + f, + epsilon_a, + simulation_env_name, + gym_env_name, + discount_factor, + ) + expected = ( + f"p_a: {p_a}, p_c: {p_c}, p_u: {p_u}, s_max: {s_max}, " + f"transition_tensor: {transition_tensor}, cost_tensor: {cost_tensor}, seed: {seed}, " + f"negate_costs: {negate_costs}, states: {states}, actions: {actions}, " + f"initial state: {initial_state}, constraint_cost_tensor: {constraint_cost_tensor}, " + f"f: {f}, epsilon_a: {epsilon_a}, simulation_env_name: {simulation_env_name}, " + f"gym_env_name: {gym_env_name}, discount_factor: {discount_factor}" + ) + assert dto.__str__() == expected + + def test_from_dict(self) -> None: + """ + Tests the function of converting a dict representation to an instance + """ + dto_dict = { + "p_a": 0.1, + "p_c": 0.2, + "p_u": 0.3, + "s_max": 10, + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "cost_tensor": [0.5, 0.6], + "negate_costs": True, + "seed": 123, + "states": [0, 1], + "actions": [0, 1], + "initial_state": 0, + "constraint_cost_tensor": [0.7, 0.8], + "f": 5, + "epsilon_a": 0.9, + "simulation_env_name": "SimulationEnv", + "gym_env_name": "GymEnv", + "discount_factor": 0.95, + } + dto = IntrusionResponseCmdpConfig.from_dict(dto_dict) + assert dto.p_a == dto_dict["p_a"] + assert dto.p_c == dto_dict["p_c"] + assert dto.p_u == dto_dict["p_u"] + assert dto.s_max == dto_dict["s_max"] + assert dto.transition_tensor == dto_dict["transition_tensor"] + assert dto.cost_tensor == dto_dict["cost_tensor"] + assert dto.negate_costs == dto_dict["negate_costs"] + assert dto.seed == dto_dict["seed"] + assert dto.states == dto_dict["states"] + assert dto.actions == dto_dict["actions"] + assert dto.initial_state == dto_dict["initial_state"] + assert dto.constraint_cost_tensor == dto_dict["constraint_cost_tensor"] + assert dto.f == dto_dict["f"] + assert dto.epsilon_a == dto_dict["epsilon_a"] + assert dto.simulation_env_name == dto_dict["simulation_env_name"] + assert dto.gym_env_name == dto_dict["gym_env_name"] + assert dto.discount_factor == dto_dict["discount_factor"] + + def test_to_dict(self) -> None: + """ + Tests the function of getting a dict representation of the object + """ + p_a = 0.1 + p_c = 0.2 + p_u = 0.3 + s_max = 10 + transition_tensor = [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] + cost_tensor = [0.5, 0.6] + negate_costs = True + seed = 123 + states = [0, 1] + actions = [0, 1] + initial_state = 0 + constraint_cost_tensor = [0.7, 0.8] + f = 5 + epsilon_a = 0.9 + simulation_env_name = "SimulationEnv" + gym_env_name = "GymEnv" + discount_factor = 0.95 + dto = IntrusionResponseCmdpConfig( + p_a, + p_c, + p_u, + s_max, + transition_tensor, + cost_tensor, + negate_costs, + seed, + states, + actions, + initial_state, + constraint_cost_tensor, + f, + epsilon_a, + simulation_env_name, + gym_env_name, + discount_factor, + ) + expected = { + "p_a": 0.1, + "p_c": 0.2, + "p_u": 0.3, + "s_max": 10, + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "cost_tensor": [0.5, 0.6], + "negate_costs": True, + "seed": 123, + "states": [0, 1], + "actions": [0, 1], + "initial_state": 0, + "constraint_cost_tensor": [0.7, 0.8], + "f": 5, + "epsilon_a": 0.9, + "simulation_env_name": "SimulationEnv", + "gym_env_name": "GymEnv", + "discount_factor": 0.95, + } + assert dto.to_dict() == expected + + def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: + """ + tests the function of reading a json file and converting it to a DTO + """ + p_a = 0.1 + p_c = 0.2 + p_u = 0.3 + s_max = 10 + transition_tensor = [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]] + cost_tensor = [0.5, 0.6] + negate_costs = True + seed = 123 + states = [0, 1] + actions = [0, 1] + initial_state = 0 + constraint_cost_tensor = [0.7, 0.8] + f = 5 + epsilon_a = 0.9 + simulation_env_name = "SimulationEnv" + gym_env_name = "GymEnv" + discount_factor = 0.95 + config = IntrusionResponseCmdpConfig( + p_a, + p_c, + p_u, + s_max, + transition_tensor, + cost_tensor, + negate_costs, + seed, + states, + actions, + initial_state, + constraint_cost_tensor, + f, + epsilon_a, + simulation_env_name, + gym_env_name, + discount_factor, + ) + mocker.patch( + "io.open", side_effect=mocker.mock_open(read_data=config.to_json_str()) + ) + dto = IntrusionResponseCmdpConfig.from_json_file("path") + assert dto.p_a == p_a + assert dto.p_c == p_c + assert dto.p_u == p_u + assert dto.s_max == s_max + assert dto.transition_tensor == transition_tensor + assert dto.cost_tensor == cost_tensor + assert dto.negate_costs == negate_costs + assert dto.seed == seed + assert dto.states == states + assert dto.actions == actions + assert dto.initial_state == initial_state + assert dto.constraint_cost_tensor == constraint_cost_tensor + assert dto.f == f + assert dto.epsilon_a == epsilon_a + assert dto.simulation_env_name == simulation_env_name + assert dto.gym_env_name == gym_env_name + assert dto.discount_factor == discount_factor From 2549275d54aeca727debdd26e09eeb81bc08d4b7 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 15 May 2024 11:00:32 +0200 Subject: [PATCH 13/16] fixed small things --- .../src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py | 1 - .../libs/csle-tolerance/src/csle_tolerance/util/general_util.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py index 054395d3a..8c4e983a5 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/envs/intrusion_recovery_pomdp_env.py @@ -146,7 +146,6 @@ def _info(self, info: Dict[str, Any]) -> Dict[str, Any]: upper_bound_return = 0 s = self.trace.states[0] for i in range(len(self.trace.states)): - print(i) if s == 0 or s == 2: a = 0 else: diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py index d5512bae2..2052251c4 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py @@ -72,9 +72,7 @@ def register_envs() -> None: :return: None """ - #import logging from gymnasium.envs.registration import register - #logging.info(f"mocked object?: {register}") register( id='csle-tolerance-intrusion-recovery-pomdp-v1', entry_point='csle_tolerance.envs.intrusion_recovery_pomdp_env:IntrusionRecoveryPomdpEnv', From b5f839ed0070a040ec66852cb33c62f0866d8186 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 15 May 2024 11:01:42 +0200 Subject: [PATCH 14/16] fixed small things --- .../src/csle_tolerance/util/general_util.py | 26 ++-- .../util/intrusion_recovery_pomdp_util.py | 116 ++++++++++++++---- 2 files changed, 109 insertions(+), 33 deletions(-) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py index 2052251c4..a121064c0 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/general_util.py @@ -21,9 +21,14 @@ def threshold_probability(b1: float, threshold: float, k=-20) -> float: return 1.0 if round(b1, 2) == 0: return 0.0 - if (threshold * (1 - b1)) > 0 and (b1 * (1 - threshold)) / (threshold * (1 - b1)) > 0: + if (threshold * (1 - b1)) > 0 and (b1 * (1 - threshold)) / ( + threshold * (1 - b1) + ) > 0: try: - return math.pow(1 + math.pow(((b1 * (1 - threshold)) / (threshold * (1 - b1))), k), -1) + return math.pow( + 1 + math.pow(((b1 * (1 - threshold)) / (threshold * (1 - b1))), k), + -1, + ) except Exception: return 0.0 else: @@ -50,7 +55,9 @@ def inverse_sigmoid(y) -> float: return math.log(y / (1 - y), math.e) @staticmethod - def sample_next_state(transition_tensor: List[List[List[float]]], s: int, a: int, states: List[int]) -> int: + def sample_next_state( + transition_tensor: List[List[List[float]]], s: int, a: int, states: List[int] + ) -> int: """ Samples the next state of a MDP or POMDP @@ -73,13 +80,14 @@ def register_envs() -> None: :return: None """ from gymnasium.envs.registration import register + register( - id='csle-tolerance-intrusion-recovery-pomdp-v1', - entry_point='csle_tolerance.envs.intrusion_recovery_pomdp_env:IntrusionRecoveryPomdpEnv', - kwargs={'config': None} + id="csle-tolerance-intrusion-recovery-pomdp-v1", + entry_point="csle_tolerance.envs.intrusion_recovery_pomdp_env:IntrusionRecoveryPomdpEnv", + kwargs={"config": None}, ) register( - id='csle-tolerance-intrusion-response-cmdp-v1', - entry_point='csle_tolerance.envs.intrusion_response_cmdp_env:IntrusionResponseCmdpEnv', - kwargs={'config': None} + id="csle-tolerance-intrusion-response-cmdp-v1", + entry_point="csle_tolerance.envs.intrusion_response_cmdp_env:IntrusionResponseCmdpEnv", + kwargs={"config": None}, ) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py index 4b8fbb9bf..be9270bc0 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py @@ -1,7 +1,9 @@ from typing import List from scipy.stats import betabinom import numpy as np -from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig +from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( + IntrusionRecoveryPomdpConfig, +) class IntrusionRecoveryPomdpUtil: @@ -67,7 +69,9 @@ def cost_function(s: int, a: int, eta: float, negate: bool = False) -> float: return cost @staticmethod - def cost_tensor(eta: float, states: List[int], actions: List[int], negate: bool = False) -> List[List[float]]: + def cost_tensor( + eta: float, states: List[int], actions: List[int], negate: bool = False + ) -> List[List[float]]: """ Creates a |A|x|S| tensor with the costs (or rewards) of the POMDP @@ -81,7 +85,11 @@ def cost_tensor(eta: float, states: List[int], actions: List[int], negate: bool for a in actions: a_costs = [] for s in states: - a_costs.append(IntrusionRecoveryPomdpUtil.cost_function(s=s, a=a, eta=eta, negate=negate)) + a_costs.append( + IntrusionRecoveryPomdpUtil.cost_function( + s=s, a=a, eta=eta, negate=negate + ) + ) cost_tensor.append(a_costs) return cost_tensor @@ -108,7 +116,9 @@ def observation_function(s: int, o: int, num_observations: int) -> float: return 0.0 @staticmethod - def observation_tensor(states: List[int], observations: List[int]) -> List[List[float]]: + def observation_tensor( + states: List[int], observations: List[int] + ) -> List[List[float]]: """ Creates a |S|x|O| tensor with the observation probabilities @@ -121,13 +131,18 @@ def observation_tensor(states: List[int], observations: List[int]) -> List[List[ for s in states: s_observations = [] for o in observations: - s_observations.append(IntrusionRecoveryPomdpUtil.observation_function( - s=s, o=o, num_observations=num_observations)) + s_observations.append( + IntrusionRecoveryPomdpUtil.observation_function( + s=s, o=o, num_observations=num_observations + ) + ) observation_tensor.append(s_observations) return observation_tensor @staticmethod - def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c_1: float, p_u: float, p_c_2: float) -> float: + def transition_function( + s: int, s_prime: int, a: int, p_a: float, p_c_1: float, p_u: float, p_c_2: float + ) -> float: """ The transition function of the POMDP @@ -166,8 +181,14 @@ def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c_1: float, return 0 @staticmethod - def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: float, p_c_2: float, p_u: float) \ - -> List[List[List[float]]]: + def transition_tensor( + states: List[int], + actions: List[int], + p_a: float, + p_c_1: float, + p_c_2: float, + p_u: float, + ) -> List[List[List[float]]]: """ Creates a |A|x|S|x|S| tensor with the transition probabilities of the POMDP @@ -179,15 +200,24 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: :param p_u: the upgrade probability :return: the transition tensor """ - assert states == [0,1,2] + assert states == [0, 1, 2] transition_tensor = [] for a in actions: a_transitions = [] for s in states: s_a_transitions = [] for s_prime in states: - s_a_transitions.append(IntrusionRecoveryPomdpUtil.transition_function( - s=s, s_prime=s_prime, a=a, p_a=p_a, p_c_1=p_c_1, p_c_2=p_c_2, p_u=p_u)) + s_a_transitions.append( + IntrusionRecoveryPomdpUtil.transition_function( + s=s, + s_prime=s_prime, + a=a, + p_a=p_a, + p_c_1=p_c_1, + p_c_2=p_c_2, + p_u=p_u, + ) + ) a_transitions.append(s_a_transitions) transition_tensor.append(a_transitions) return transition_tensor @@ -203,7 +233,9 @@ def sample_initial_state(b1: List[float]) -> int: return int(np.random.choice(np.arange(0, len(b1)), p=b1)) @staticmethod - def sample_next_observation(observation_tensor: List[List[float]], s_prime: int, observations: List[int]) -> int: + def sample_next_observation( + observation_tensor: List[List[float]], s_prime: int, observations: List[int] + ) -> int: """ Samples the next observation @@ -219,8 +251,16 @@ def sample_next_observation(observation_tensor: List[List[float]], s_prime: int, return int(o) @staticmethod - def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int], observations: List[int], - observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]]) -> float: + def bayes_filter( + s_prime: int, + o: int, + a: int, + b: List[float], + states: List[int], + observations: List[int], + observation_tensor: List[List[float]], + transition_tensor: List[List[List[float]]], + ) -> float: """ A Bayesian filter to compute b[s_prime] of the POMDP @@ -244,7 +284,9 @@ def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int] temp = 0.0 for s in states: - temp += observation_tensor[s_prime][o] * transition_tensor[a][s][s_prime] * b[s] + temp += ( + observation_tensor[s_prime][o] * transition_tensor[a][s][s_prime] * b[s] + ) b_prime_s_prime = temp / norm if round(b_prime_s_prime, 2) > 1: print(f"b_prime_s_prime >= 1: {b_prime_s_prime}, a1:{a}, s_prime:{s_prime}") @@ -254,8 +296,14 @@ def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int] return b_prime_s_prime @staticmethod - def p_o_given_b_a1_a2(o: int, b: List[float], a: int, states: List[int], - transition_tensor: List[List[List[float]]], observation_tensor: List[List[float]]) -> float: + def p_o_given_b_a1_a2( + o: int, + b: List[float], + a: int, + states: List[int], + transition_tensor: List[List[List[float]]], + observation_tensor: List[List[float]], + ) -> float: """ Computes P[o|a,b] of the POMDP @@ -270,13 +318,24 @@ def p_o_given_b_a1_a2(o: int, b: List[float], a: int, states: List[int], prob = 0.0 for s in states: for s_prime in states: - prob += b[s] * transition_tensor[a][s][s_prime] * observation_tensor[s_prime][o] + prob += ( + b[s] + * transition_tensor[a][s][s_prime] + * observation_tensor[s_prime][o] + ) assert prob < 1 return prob @staticmethod - def next_belief(o: int, a: int, b: List[float], states: List[int], observations: List[int], - observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]]) -> List[float]: + def next_belief( + o: int, + a: int, + b: List[float], + states: List[int], + observations: List[int], + observation_tensor: List[List[float]], + transition_tensor: List[List[List[float]]], + ) -> List[float]: """ Computes the next belief using a Bayesian filter @@ -292,8 +351,15 @@ def next_belief(o: int, a: int, b: List[float], states: List[int], observations: b_prime = [0.0] * len(states) for s_prime in states: b_prime[s_prime] = IntrusionRecoveryPomdpUtil.bayes_filter( - s_prime=s_prime, o=o, a=a, b=b, states=states, observations=observations, - transition_tensor=transition_tensor, observation_tensor=observation_tensor) + s_prime=s_prime, + o=o, + a=a, + b=b, + states=states, + observations=observations, + transition_tensor=transition_tensor, + observation_tensor=observation_tensor, + ) if round(sum(b_prime), 2) != 1: print(f"error, b_prime:{b_prime}, o:{o}, a:{a}, b:{b}") assert round(sum(b_prime), 2) == 1 @@ -341,5 +407,7 @@ def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str: for s_prime in config.states: for o in config.observations: c = config.cost_tensor[a][s] - file_str = file_str + f"R: {a} : {s} : {s_prime} : {o} {c:.80f}\n" + file_str = ( + file_str + f"R: {a} : {s} : {s_prime} : {o} {c:.80f}\n" + ) return file_str From 89af47e65d8c99ec63f70fb1b35179b5df2f4070 Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 15 May 2024 13:47:54 +0200 Subject: [PATCH 15/16] fix --- .../dao/intrusion_recovery_game_config.py | 4 +- .../test_intrusion_recovery_game_config.py | 246 ++++++++++++++++++ 2 files changed, 248 insertions(+), 2 deletions(-) create mode 100644 simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py index 5537e34b7..3e8d90662 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py @@ -58,7 +58,7 @@ def __str__(self) -> str: """ :return: a string representation of the DTO """ - return (f"eta: {self.eta}, p_a: {self.p_a}, p_c_1: {self.p_c_1}," + return (f"eta: {self.eta}, p_a: {self.p_a}, p_c_1: {self.p_c_1}, " f"BTR: {self.BTR}, negate_costs: {self.negate_costs}, seed: {self.seed}, " f"discount_factor: {self.discount_factor}, states: {self.states}, actions: {self.actions}, " f"observations: {self.observation_tensor}, cost_tensor: {self.cost_tensor}, " @@ -105,7 +105,7 @@ def to_dict(self) -> Dict[str, Any]: d["b1"] = self.b1 d["T"] = self.T d["simulation_env_name"] = self.simulation_env_name - d["gym_env_name"] = self.simulation_env_name + d["gym_env_name"] = self.gym_env_name return d @staticmethod diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py new file mode 100644 index 000000000..33e8155e4 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py @@ -0,0 +1,246 @@ +from csle_tolerance.dao.intrusion_recovery_game_config import ( + IntrusionRecoveryGameConfig, +) +from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil +import pytest_mock +import numpy as np + + +class TestIntrusionRecoveryGameConfigSuite: + """ + Test suite for intrusion_recovery_game_config.py + """ + + def test__init__(self) -> None: + """ + Tests the function of initializing the DTO + """ + dto = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + assert dto.eta == 0.5 + assert dto.p_a == 0.8 + assert dto.p_c_1 == 0.1 + assert dto.BTR == 10 + assert dto.negate_costs is True + assert dto.seed == 123 + assert dto.discount_factor == 0.9 + assert dto.states == [0, 1, 2] + assert dto.actions == [0, 1] + assert dto.observations == [0, 1] + assert dto.cost_tensor == [[1, 2], [3, 4]] + assert dto.observation_tensor == [[1, 2], [3, 4]] + assert dto.transition_tensor == [ + [[0.1, 0.2], [0.3, 0.4]], + [[0.5, 0.6], [0.7, 0.8]], + ] + assert dto.b1 == [0.1, 0.9] + assert dto.T == 100 + assert dto.simulation_env_name == "sim_env" + assert dto.gym_env_name == "gym_env" + assert dto.max_horizon == 1000 + + def test__str__(self) -> None: + """ + Tests the function of returning a string representation of the DTO + """ + dto = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + max_horizon=1000, + ) + expected = ( + "eta: 0.5, p_a: 0.8, p_c_1: 0.1, BTR: 10, negate_costs: True, seed: 123, " + "discount_factor: 0.9, states: [0, 1, 2], actions: [0, 1], observations: [[1, 2], [3, 4]], " + "cost_tensor: [[1, 2], [3, 4]], observation_tensor: [[1, 2], [3, 4]], " + "transition_tensor: [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], b1:[0.1, 0.9], " + "T: 100, simulation_env_name: sim_env, gym_env_name: gym_env, max_horizon: 1000" + ) + assert dto.__str__() == expected + + def test_from_dict(self) -> None: + """ + Tests the function of converting a dict representation to an instance + """ + dto_dict = { + "eta": 0.5, + "p_a": 0.8, + "p_c_1": 0.1, + "BTR": 10, + "negate_costs": True, + "seed": 123, + "discount_factor": 0.9, + "states": [0, 1, 2], + "actions": [0, 1], + "observations": [0, 1], + "cost_tensor": [[1, 2], [3, 4]], + "observation_tensor": [[1, 2], [3, 4]], + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "b1": [0.1, 0.9], + "T": 100, + "simulation_env_name": "sim_env", + "gym_env_name": "gym_env", + } + dto = IntrusionRecoveryGameConfig.from_dict(dto_dict) + assert dto.eta == 0.5 + assert dto.p_a == 0.8 + assert dto.p_c_1 == 0.1 + assert dto.BTR == 10 + assert dto.negate_costs is True + assert dto.seed == 123 + assert dto.discount_factor == 0.9 + assert dto.states == [0, 1, 2] + assert dto.actions == [0, 1] + assert dto.observations == [0, 1] + assert dto.cost_tensor == [[1, 2], [3, 4]] + assert dto.observation_tensor == [[1, 2], [3, 4]] + assert dto.transition_tensor == [ + [[0.1, 0.2], [0.3, 0.4]], + [[0.5, 0.6], [0.7, 0.8]], + ] + assert dto.b1 == [0.1, 0.9] + assert dto.T == 100 + assert dto.simulation_env_name == "sim_env" + assert dto.gym_env_name == "gym_env" + + def test_to_dict(self) -> None: + """ + Tests the function of getting a dict representation of the object + """ + dto = IntrusionRecoveryGameConfig( + eta=0.5, + p_a=0.8, + p_c_1=0.1, + BTR=10, + negate_costs=True, + seed=123, + discount_factor=0.9, + states=[0, 1, 2], + actions=[0, 1], + observations=[0, 1], + cost_tensor=[[1, 2], [3, 4]], + observation_tensor=[[1, 2], [3, 4]], + transition_tensor=[[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + b1=[0.1, 0.9], + T=100, + simulation_env_name="sim_env", + gym_env_name="gym_env", + ) + expected = { + "eta": 0.5, + "p_a": 0.8, + "p_c_1": 0.1, + "BTR": 10, + "negate_costs": True, + "seed": 123, + "discount_factor": 0.9, + "states": [0, 1, 2], + "actions": [0, 1], + "observations": [0, 1], + "cost_tensor": [[1, 2], [3, 4]], + "observation_tensor": [[1, 2], [3, 4]], + "transition_tensor": [[[0.1, 0.2], [0.3, 0.4]], [[0.5, 0.6], [0.7, 0.8]]], + "b1": [0.1, 0.9], + "T": 100, + "simulation_env_name": "sim_env", + "gym_env_name": "gym_env", + } + assert dto.to_dict() == expected + + def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: + """ + tests the function of reading a json file and converting it to a DTO + """ + eta = 2 + p_a = 0.05 + p_c_1 = 0.01 + p_c_2 = 0.01 + p_u = 0.0 + BTR = np.inf + negate_costs = False + discount_factor = 1 - p_c_1 + num_observations = 100 + simulation_name = "csle-tolerance-intrusion-recovery-pomdp-defender-001" + cost_tensor = IntrusionRecoveryPomdpUtil.cost_tensor( + eta=eta, + states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + negate=negate_costs, + ) + observation_tensor = IntrusionRecoveryPomdpUtil.observation_tensor( + states=IntrusionRecoveryPomdpUtil.state_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space( + num_observations=num_observations + ), + ) + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor_game( + states=IntrusionRecoveryPomdpUtil.state_space(), + defender_actions=IntrusionRecoveryPomdpUtil.action_space(), + attacker_actions=IntrusionRecoveryPomdpUtil.action_space(), + p_a=p_a, + p_c_1=p_c_1, + ) + config = IntrusionRecoveryGameConfig( + eta=eta, + p_a=p_a, + p_c_1=p_c_1, + BTR=BTR, + negate_costs=negate_costs, + seed=999, + discount_factor=discount_factor, + states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space( + num_observations=num_observations + ), + cost_tensor=cost_tensor, + observation_tensor=observation_tensor, + transition_tensor=transition_tensor, + b1=IntrusionRecoveryPomdpUtil.initial_belief(p_a=p_a), + T=BTR, + simulation_env_name=simulation_name, + gym_env_name="csle-tolerance-intrusion-recovery-pomdp-v1", + ) + mocker.patch( + "io.open", side_effect=mocker.mock_open(read_data=config.to_json_str()) + ) + dto = IntrusionRecoveryGameConfig.from_json_file("path") + assert dto.eta == 2 + assert dto.p_a == 0.05 + assert dto.p_c_1 == 0.01 + assert dto.BTR == np.inf + assert dto.negate_costs is False + assert dto.seed == 999 + assert dto.discount_factor == 1 - p_c_1 From 36b2a72e5ee1026db69143de420816d8bdc7124f Mon Sep 17 00:00:00 2001 From: Yuhu-kth Date: Wed, 15 May 2024 13:50:32 +0200 Subject: [PATCH 16/16] fixed --- .../util/intrusion_recovery_pomdp_util.py | 90 ++++++++++++++----- .../test_intrusion_recovery_game_config.py | 2 - 2 files changed, 66 insertions(+), 26 deletions(-) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py index f9eb9b370..5bb8197bb 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py @@ -1,8 +1,13 @@ from typing import List from scipy.stats import betabinom import numpy as np -from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig -from csle_tolerance.dao.intrusion_recovery_game_config import IntrusionRecoveryGameConfig +from csle_tolerance.dao.intrusion_recovery_pomdp_config import ( + IntrusionRecoveryPomdpConfig, +) +from csle_tolerance.dao.intrusion_recovery_game_config import ( + IntrusionRecoveryGameConfig, +) + class IntrusionRecoveryPomdpUtil: """ @@ -179,7 +184,9 @@ def transition_function( return 0 @staticmethod - def transition_function_game(s: int, s_prime: int, a1: int, a2: int, p_a: float, p_c_1: float) -> float: + def transition_function_game( + s: int, s_prime: int, a1: int, a2: int, p_a: float, p_c_1: float + ) -> float: """ The transition function of the POSG @@ -197,10 +204,13 @@ def transition_function_game(s: int, s_prime: int, a1: int, a2: int, p_a: float, return p_c_1 elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0: return (1 - p_a) * (1 - p_c_1) - elif (s_prime == 0 and a2 == 0 and s == 0) or (s_prime == 0 and s == 1 and a1 == 1) \ - or (s_prime == 1 and s == 1 and a1 == 0): - return (1 - p_c_1) - elif (s_prime == 1 and s == 0 and a2 == 1): + elif ( + (s_prime == 0 and a2 == 0 and s == 0) + or (s_prime == 0 and s == 1 and a1 == 1) + or (s_prime == 1 and s == 1 and a1 == 0) + ): + return 1 - p_c_1 + elif s_prime == 1 and s == 0 and a2 == 1: return (1 - p_c_1) * p_a else: return 0 @@ -232,15 +242,29 @@ def transition_tensor( for s in states: s_a_transitions = [] for s_prime in states: - s_a_transitions.append(IntrusionRecoveryPomdpUtil.transition_function( - s=s, s_prime=s_prime, a=a, p_a=p_a, p_c_1=p_c_1, p_c_2=p_c_2, p_u=p_u)) + s_a_transitions.append( + IntrusionRecoveryPomdpUtil.transition_function( + s=s, + s_prime=s_prime, + a=a, + p_a=p_a, + p_c_1=p_c_1, + p_c_2=p_c_2, + p_u=p_u, + ) + ) a_transitions.append(s_a_transitions) transition_tensor.append(a_transitions) return transition_tensor @staticmethod - def transition_tensor_game(states: List[int], defender_actions: List[int], attacker_actions: List[int], - p_a: float, p_c_1: float) -> List[List[List[List[float]]]]: + def transition_tensor_game( + states: List[int], + defender_actions: List[int], + attacker_actions: List[int], + p_a: float, + p_c_1: float, + ) -> List[List[List[List[float]]]]: """ Creates a |A|x|A|x|S|x|S| tensor with the transition probabilities of the POSG @@ -259,8 +283,11 @@ def transition_tensor_game(states: List[int], defender_actions: List[int], attac for s in states: s_a1_a2_transitions = [] for s_prime in states: - s_a1_a2_transitions.append(IntrusionRecoveryPomdpUtil.transition_function_game( - s=s, s_prime=s_prime, a1=a1, a2=a2, p_a=p_a, p_c_1=p_c_1)) + s_a1_a2_transitions.append( + IntrusionRecoveryPomdpUtil.transition_function_game( + s=s, s_prime=s_prime, a1=a1, a2=a2, p_a=p_a, p_c_1=p_c_1 + ) + ) a2_transitions.append(s_a1_a2_transitions) a1_transitions.append(a2_transitions) transition_tensor.append(a1_transitions) @@ -295,7 +322,9 @@ def sample_next_observation( return int(o) @staticmethod - def sample_next_state_game(transition_tensor: List[List[List[List[float]]]], s: int, a1: int, a2: int) -> int: + def sample_next_state_game( + transition_tensor: List[List[List[List[float]]]], s: int, a1: int, a2: int + ) -> int: """ Samples the next observation @@ -305,7 +334,10 @@ def sample_next_state_game(transition_tensor: List[List[List[List[float]]]], s: :param transition_tensor: the transition tensor :return: the next state a """ - s_prime = np.random.choice(np.arange(0, len(transition_tensor[a1][a2][s])), p=transition_tensor[a1][a2][s]) + s_prime = np.random.choice( + np.arange(0, len(transition_tensor[a1][a2][s])), + p=transition_tensor[a1][a2][s], + ) return int(s_prime) @staticmethod @@ -520,12 +552,16 @@ def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str: :return: a string with the contents of the config file """ num_partitions = 1 - transitions = IntrusionRecoveryPomdpUtil.generate_transitions(game_config=game_config) + transitions = IntrusionRecoveryPomdpUtil.generate_transitions( + game_config=game_config + ) rewards = IntrusionRecoveryPomdpUtil.generate_rewards(game_config=game_config) - game_description = f"{len(game_config.states)} {num_partitions} {len(game_config.actions)} " \ - f"{len(game_config.actions)} " \ - f"{len(game_config.observations)} {len(transitions)} " \ - f"{len(rewards)} {game_config.discount_factor}" + game_description = ( + f"{len(game_config.states)} {num_partitions} {len(game_config.actions)} " + f"{len(game_config.actions)} " + f"{len(game_config.observations)} {len(transitions)} " + f"{len(rewards)} {game_config.discount_factor}" + ) state_desriptions = [] for s in game_config.states: state_desriptions.append(f"{s} {0}") @@ -534,16 +570,22 @@ def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str: player_2_legal_actions = [] for _ in game_config.states: - player_2_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions)))) + player_2_legal_actions.append( + " ".join(list(map(lambda x: str(x), game_config.actions))) + ) player_1_legal_actions = [] - player_1_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions)))) + player_1_legal_actions.append( + " ".join(list(map(lambda x: str(x), game_config.actions))) + ) obs_desriptions = [] for i, o in enumerate(game_config.observations): obs_desriptions.append(f"o_{o}") - initial_belief_str = f"{0} {' '.join(list(map(lambda x: str(x), game_config.b1)))}" + initial_belief_str = ( + f"{0} {' '.join(list(map(lambda x: str(x), game_config.b1)))}" + ) game_file_str = "" game_file_str = game_file_str + game_description + "\n" game_file_str = game_file_str + "\n".join(state_desriptions) + "\n" @@ -555,6 +597,6 @@ def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str: game_file_str = game_file_str + "\n".join(transitions) + "\n" game_file_str = game_file_str + "\n".join(rewards) + "\n" game_file_str = game_file_str + initial_belief_str - with open('recovery_game.txt', 'w') as f: + with open("recovery_game.txt", "w") as f: f.write(game_file_str) return game_file_str diff --git a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py index 33e8155e4..f7fef9f78 100644 --- a/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py +++ b/simulation-system/libs/csle-tolerance/tests/test_intrusion_recovery_game_config.py @@ -186,8 +186,6 @@ def test_from_json_file(self, mocker: pytest_mock.MockFixture) -> None: eta = 2 p_a = 0.05 p_c_1 = 0.01 - p_c_2 = 0.01 - p_u = 0.0 BTR = np.inf negate_costs = False discount_factor = 1 - p_c_1