diff --git a/feature_integration_tests/test_cases/tests/basic/test_orchestration_with_persistency.py b/feature_integration_tests/test_cases/tests/basic/test_orchestration_with_persistency.py index 8659bb90e3..368fc437f9 100644 --- a/feature_integration_tests/test_cases/tests/basic/test_orchestration_with_persistency.py +++ b/feature_integration_tests/test_cases/tests/basic/test_orchestration_with_persistency.py @@ -57,3 +57,228 @@ def test_kvs_write_results(self, temp_dir: Path, run_count: int): kvs_file = temp_dir / "kvs_1_0.json" data = json.loads(kvs_file.read_text()) assert data["v"]["run_cycle_number"]["v"] == run_count + + +@add_test_properties( + partially_verifies=["feat_req__persistency__multiple_kvs"], + test_type="requirements-based", + derivation_technique="requirements-analysis", +) +class TestConcurrentKVS(FitScenario): + """ + Tests persistency running in orchestration scenario using multiple KVS files concurrently. + """ + + @pytest.fixture(scope="class") + def scenario_name(self) -> str: + return "basic.concurrent_kvs" + + @pytest.fixture(scope="class", params=[1, 5]) + def run_count(self, request) -> int: + return request.param + + @pytest.fixture(scope="class") + def temp_dir( + self, + tmp_path_factory: pytest.TempPathFactory, + run_count: int, # run_count is required to ensure proper order of fixture calls + ) -> Generator[Path, None, None]: + yield from temp_dir_common(tmp_path_factory, self.__class__.__name__) + + @pytest.fixture(scope="class") + def test_config(self, run_count: int, temp_dir: Path) -> dict[str, Any]: + return { + "runtime": {"task_queue_size": 2, "workers": 4}, + "test": {"run_count": run_count, "kvs_path": str(temp_dir)}, + } + + def test_kvs_logged_execution(self, run_count: int, logs_info_level: LogContainer): + """Verify that all runs have been logged.""" + logs = logs_info_level.get_logs(field="run_cycle_number") + for log_group in logs.group_by("kvs_instance_id").values(): + logged_cycles = [log.run_cycle_number for log in log_group] + expected_cycles = list(range(1, run_count + 1)) + assert logged_cycles == expected_cycles + + def test_kvs_write_results(self, temp_dir: Path, run_count: int): + """Verify that each KVS file contains correct final run count.""" + # Verify KVS Instance(1) + kvs1_file = temp_dir / "kvs_1_0.json" + data1 = json.loads(kvs1_file.read_text()) + assert data1["v"]["run_cycle_number"]["v"] == run_count + + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["run_cycle_number"]["v"] == run_count + + # Verify KVS Instance(3) + kvs3_file = temp_dir / "kvs_3_0.json" + data3 = json.loads(kvs3_file.read_text()) + assert data3["v"]["run_cycle_number"]["v"] == run_count + + +@add_test_properties( + partially_verifies=[ + "feat_req__persistency__multiple_kvs", + "feat_req__persistency__support_datatype_value", + ], + test_type="requirements-based", + derivation_technique="requirements-analysis", +) +class TestMultipleKVS(FitScenario): + """ + Tests persistency running in orchestration scenario using multiple KVS instances. + Validates all basic data types stored in KVS. + """ + + @pytest.fixture(scope="class") + def scenario_name(self) -> str: + return "basic.multiple_kvs" + + @pytest.fixture(scope="class") + def run_count(self) -> int: + return 1 + + @pytest.fixture(scope="class") + def temp_dir( + self, + tmp_path_factory: pytest.TempPathFactory, + run_count: int, # run_count is required to ensure proper order of fixture calls + ) -> Generator[Path, None, None]: + yield from temp_dir_common(tmp_path_factory, self.__class__.__name__) + + @pytest.fixture(scope="class") + def test_config(self, run_count: int, temp_dir: Path) -> dict[str, Any]: + return { + "runtime": {"task_queue_size": 256, "workers": 4}, + "test": {"run_count": run_count, "kvs_path": str(temp_dir)}, + } + + def test_kvs_cycle_write_results(self, temp_dir: Path, run_count: int): + """Verify that KVS file contains correct final run count.""" + # Verify KVS Instance(1) + kvs1_file = temp_dir / "kvs_1_0.json" + data1 = json.loads(kvs1_file.read_text()) + assert data1["v"]["run_cycle_number"]["v"] == run_count + + def test_kvs_write_i32_max(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_i32_max"]["v"] == 2147483647 + assert data2["v"]["key_i32_max"]["t"] == "i32" + + def test_kvs_write_i32_min(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_i32_min"]["v"] == -2147483648 + assert data2["v"]["key_i32_min"]["t"] == "i32" + + def test_kvs_write_u32_max(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_u32_max"]["v"] == 4294967295 + assert data2["v"]["key_u32_max"]["t"] == "u32" + + def test_kvs_write_u32_min(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_u32_min"]["v"] == 0 + assert data2["v"]["key_u32_min"]["t"] == "u32" + + @pytest.mark.xfail(reason="https://github.com/eclipse-score/persistency/issues/204") + def test_kvs_write_i64_max(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_i64_max"]["v"] == 9223372036854775807 + assert data2["v"]["key_i64_max"]["t"] == "i64" + + @pytest.mark.xfail(reason="https://github.com/eclipse-score/persistency/issues/204") + def test_kvs_write_i64_min(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_i64_min"]["v"] == -9223372036854775808 + assert data2["v"]["key_i64_min"]["t"] == "i64" + + @pytest.mark.xfail(reason="https://github.com/eclipse-score/persistency/issues/204") + def test_kvs_write_u64_max(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_u64_max"]["v"] == 18446744073709551615 + assert data2["v"]["key_u64_max"]["t"] == "u64" + + def test_kvs_write_u64_min(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_u64_min"]["v"] == 0 + assert data2["v"]["key_u64_min"]["t"] == "u64" + + def test_kvs_write_f64(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_f64"]["v"] == 1.2345 + assert data2["v"]["key_f64"]["t"] == "f64" + + def test_kvs_write_bool(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_bool"]["v"] == True # noqa: E712 + assert data2["v"]["key_bool"]["t"] == "bool" + + def test_kvs_write_string(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_String"]["v"] == "TestString" + assert data2["v"]["key_String"]["t"] == "str" + + def test_kvs_write_null(self, temp_dir: Path): + """Verify that KVS file contains correct value.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_Null"]["v"] is None + assert data2["v"]["key_Null"]["t"] == "null" + + def test_kvs_write_array(self, temp_dir: Path): + """Verify that KVS file contains correct values.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_Array"]["v"] == [ + {"t": "i32", "v": 1}, + {"t": "i32", "v": 2}, + {"t": "i32", "v": 3}, + ] + assert data2["v"]["key_Array"]["t"] == "arr" + + def test_kvs_write_map(self, temp_dir: Path): + """Verify that KVS file contains correct values.""" + # Verify KVS Instance(2) + kvs2_file = temp_dir / "kvs_2_0.json" + data2 = json.loads(kvs2_file.read_text()) + assert data2["v"]["key_Map"]["v"] == { + "inner_key": {"t": "i32", "v": 1}, + } + assert data2["v"]["key_Map"]["t"] == "obj" diff --git a/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/mod.rs b/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/mod.rs index cb871dd13a..de39fdfc3b 100644 --- a/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/mod.rs +++ b/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/mod.rs @@ -11,13 +11,16 @@ // mod orchestration_with_persistency; -use orchestration_with_persistency::OrchestrationWithPersistency; use test_scenarios_rust::scenario::{ScenarioGroup, ScenarioGroupImpl}; pub fn basic_scenario_group() -> Box { Box::new(ScenarioGroupImpl::new( "basic", - vec![Box::new(OrchestrationWithPersistency)], + vec![ + Box::new(orchestration_with_persistency::OrchestrationWithPersistency), + Box::new(orchestration_with_persistency::ConcurrentKvsOrchestrationWithPersistency), + Box::new(orchestration_with_persistency::MultipleKvsOrchestrationWithPersistency), + ], vec![], )) } diff --git a/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/orchestration_with_persistency.rs b/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/orchestration_with_persistency.rs index ceab4b4b15..baca581638 100644 --- a/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/orchestration_with_persistency.rs +++ b/feature_integration_tests/test_scenarios/rust/src/scenarios/basic/orchestration_with_persistency.rs @@ -18,14 +18,12 @@ use orchestration::{ api::{design::Design, Orchestration}, common::DesignConfig, }; - -use rust_kvs::kvs_api::KvsApi; -use rust_kvs::Kvs; -use rust_kvs::KvsBuilder; +use rust_kvs::prelude::{InstanceId, KvsDefaults, KvsLoad, KvsMap, KvsValue}; +use rust_kvs::{kvs_api::KvsApi, Kvs, KvsBuilder}; use serde_json::Value; - +use std::{sync::Arc, vec}; use test_scenarios_rust::scenario::Scenario; -use tracing::info; +use tracing::{field, info}; use serde::Deserialize; @@ -42,50 +40,25 @@ impl TestInput { } } -use rust_kvs::prelude::{InstanceId, KvsDefaults, KvsLoad}; -use std::path::PathBuf; -pub struct KvsParameters { - pub instance_id: InstanceId, - pub defaults: Option, - pub kvs_load: Option, - pub dir: Option, - pub snapshot_max_count: Option, -} +fn create_kvs(instance_id: usize, path: String) -> Arc { + let builder = KvsBuilder::new(InstanceId(instance_id)) + .defaults(KvsDefaults::Optional) + .dir(path.clone()) + .kvs_load(KvsLoad::Optional) + .snapshot_max_count(3); -macro_rules! persistency_task { - ($path:expr) => { - move || kvs_save_cycle_number($path.clone()) - }; + let kvs = builder.build().expect("Failed to build KVS instance"); + Arc::new(kvs) } -async fn kvs_save_cycle_number(path: String) -> Result<(), UserErrValue> { - let params = KvsParameters { - instance_id: InstanceId(1), - defaults: Some(KvsDefaults::Optional), - kvs_load: Some(KvsLoad::Optional), - dir: Some(PathBuf::from(path)), - snapshot_max_count: Some(3), - }; - - // Set builder parameters. - let mut builder = KvsBuilder::new(params.instance_id); - if let Some(flag) = params.defaults { - builder = builder.defaults(flag); - } - if let Some(flag) = params.kvs_load { - builder = builder.kvs_load(flag); - } - if let Some(dir) = params.dir { - builder = builder.dir(dir.to_string_lossy().to_string()); - } - if let Some(max_count) = params.snapshot_max_count { - builder = builder.snapshot_max_count(max_count); - } - - // Create KVS. - let kvs: Kvs = builder.build().expect("Failed to build KVS instance"); +macro_rules! persistency_cycle_task { + ($kvs:expr) => {{ + let kvs = Arc::clone(&$kvs); + move || kvs_save_cycle_number(Arc::clone(&kvs)) + }}; +} - // Simple set/get. +async fn kvs_save_cycle_number(kvs: Arc) -> Result<(), UserErrValue> { let key = "run_cycle_number"; let last_cycle_number: u32 = kvs.get_value_as::(key).unwrap_or_else(|_| 0_u32); @@ -95,24 +68,86 @@ async fn kvs_save_cycle_number(path: String) -> Result<(), UserErrValue> { kvs.flush().expect("Failed to flush KVS"); - info!(run_cycle_number = value_read); + info!( + kvs_instance_id = field::debug(kvs.parameters().instance_id), + run_cycle_number = value_read + ); + + Ok(()) +} + +macro_rules! persistency_save_task { + ($kvs:expr) => {{ + let kvs = Arc::clone(&$kvs); + move || kvs_save_data(Arc::clone(&kvs)) + }}; +} + +async fn kvs_save_data(kvs: Arc) -> Result<(), UserErrValue> { + // i32 + kvs.set_value("key_i32_min", i32::MIN) + .expect("Failed to set value"); + kvs.set_value("key_i32_max", i32::MAX) + .expect("Failed to set value"); + // u32 + kvs.set_value("key_u32_min", u32::MIN) + .expect("Failed to set value"); + kvs.set_value("key_u32_max", u32::MAX) + .expect("Failed to set value"); + // i64 + kvs.set_value("key_i64_min", i64::MIN) + .expect("Failed to set value"); + kvs.set_value("key_i64_max", i64::MAX) + .expect("Failed to set value"); + // u64 + kvs.set_value("key_u64_min", u64::MIN) + .expect("Failed to set value"); + kvs.set_value("key_u64_max", u64::MAX) + .expect("Failed to set value"); + // f64 + kvs.set_value("key_f64", 1.2345_f64) + .expect("Failed to set value"); + // bool + kvs.set_value("key_bool", true) + .expect("Failed to set value"); + // String + kvs.set_value("key_String", "TestString".to_string()) + .expect("Failed to set value"); + // Null + kvs.set_value("key_Null", KvsValue::Null) + .expect("Failed to set value"); + // Array + kvs.set_value( + "key_Array", + vec![ + KvsValue::from(1i32), + KvsValue::from(2i32), + KvsValue::from(3i32), + ], + ) + .expect("Failed to set value"); + // Map + let mut map = KvsMap::new(); + map.insert("inner_key".to_string(), KvsValue::from(1i32)); + kvs.set_value("key_Map", map).expect("Failed to set value"); + + kvs.flush().expect("Failed to flush KVS"); Ok(()) } fn single_sequence_design(kvs_path: String) -> Result { let mut design = Design::new("SingleSequence".into(), DesignConfig::default()); + let kvs = create_kvs(1, kvs_path); let kvs_cycle_tag = - design.register_invoke_async("KVS save cycle".into(), persistency_task!(kvs_path))?; + design.register_invoke_async("KVS save cycle".into(), persistency_cycle_task!(kvs))?; - // Create a program with actions design.add_program(file!(), move |_design_instance, builder| { builder.with_run_action( SequenceBuilder::new() .with_step(Invoke::from_tag(&kvs_cycle_tag, _design_instance.config())) .build(), ); - Ok(()) }); @@ -147,3 +182,114 @@ impl Scenario for OrchestrationWithPersistency { Ok(()) } } + +fn concurrent_kvs_design(kvs_path: String) -> Result { + let mut design = Design::new("ConcurrentKvs".into(), DesignConfig::default()); + + let kvs1 = create_kvs(1, kvs_path.clone()); + let kvs1_tag = + design.register_invoke_async("KVS1 save cycle".into(), persistency_cycle_task!(kvs1))?; + + let kvs2 = create_kvs(2, kvs_path.clone()); + let kvs2_tag = + design.register_invoke_async("KVS2 save cycle".into(), persistency_cycle_task!(kvs2))?; + + let kvs3 = create_kvs(3, kvs_path.clone()); + let kvs3_tag = + design.register_invoke_async("KVS3 save cycle".into(), persistency_cycle_task!(kvs3))?; + + design.add_program(file!(), move |_design_instance, builder| { + builder.with_run_action( + ConcurrencyBuilder::new() + .with_branch(Invoke::from_tag(&kvs1_tag, _design_instance.config())) + .with_branch(Invoke::from_tag(&kvs2_tag, _design_instance.config())) + .with_branch(Invoke::from_tag(&kvs3_tag, _design_instance.config())) + .build(_design_instance), + ); + Ok(()) + }); + + Ok(design) +} + +pub struct ConcurrentKvsOrchestrationWithPersistency; + +impl Scenario for ConcurrentKvsOrchestrationWithPersistency { + fn name(&self) -> &str { + "concurrent_kvs" + } + + fn run(&self, input: &str) -> Result<(), String> { + let logic = TestInput::new(input); + let mut rt = Runtime::from_json(input)?.build(); + + let orch = Orchestration::new() + .add_design(concurrent_kvs_design(logic.kvs_path).expect("Failed to create design")) + .design_done(); + + let mut program_manager = orch + .into_program_manager() + .expect("Failed to create programs"); + let mut programs = program_manager.get_programs(); + + rt.block_on(async move { + let mut program = programs.pop().expect("Failed to pop program"); + let _ = program.run_n(logic.run_count).await; + }); + + Ok(()) + } +} + +fn multiple_kvs_design(kvs_path: String) -> Result { + let mut design = Design::new("ConcurrentKvs".into(), DesignConfig::default()); + + let kvs1 = create_kvs(1, kvs_path.clone()); + let kvs1_tag = + design.register_invoke_async("KVS1 save cycle".into(), persistency_cycle_task!(kvs1))?; + + let kvs2 = create_kvs(2, kvs_path.clone()); + let kvs2_tag = + design.register_invoke_async("KVS2 save cycle".into(), persistency_save_task!(kvs2))?; + + design.add_program(file!(), move |_design_instance, builder| { + builder.with_run_action( + ConcurrencyBuilder::new() + .with_branch(Invoke::from_tag(&kvs1_tag, _design_instance.config())) + .with_branch(Invoke::from_tag(&kvs2_tag, _design_instance.config())) + .build(_design_instance), + ); + Ok(()) + }); + + Ok(design) +} + +pub struct MultipleKvsOrchestrationWithPersistency; + +impl Scenario for MultipleKvsOrchestrationWithPersistency { + fn name(&self) -> &str { + "multiple_kvs" + } + + fn run(&self, input: &str) -> Result<(), String> { + let logic = TestInput::new(input); + let mut rt = Runtime::from_json(input)?.build(); + + let orch = Orchestration::new() + .add_design(multiple_kvs_design(logic.kvs_path).expect("Failed to create design")) + .design_done(); + + let mut program_manager = orch + .into_program_manager() + .expect("Failed to create programs"); + let mut programs = program_manager.get_programs(); + + rt.block_on(async move { + let mut program = programs.pop().expect("Failed to pop program"); + let _ = program.run_n(logic.run_count).await; + }); + + Ok(()) + } +}