diff --git a/sim/src/utils/errors.rs b/sim/src/utils/errors.rs index 887128a..5f16faa 100644 --- a/sim/src/utils/errors.rs +++ b/sim/src/utils/errors.rs @@ -47,6 +47,10 @@ pub enum SimulationError { #[error("Failed to convert to a Float value")] FloatConvError, + /// Represents a message unexpectedly lost/dropped/stuck during simulation execution + #[error("A message was unexpectedly lost, dropped, or stuck during simulation execution")] + DroppedMessageError, + /// Transparent serde_json errors #[error(transparent)] JSONError(#[from] serde_json::error::Error), diff --git a/sim/tests/coupled.rs b/sim/tests/coupled.rs index b2be692..b5257c5 100644 --- a/sim/tests/coupled.rs +++ b/sim/tests/coupled.rs @@ -4,13 +4,14 @@ use sim::models::{ }; use sim::output_analysis::{ConfidenceInterval, SteadyStateOutput}; use sim::simulator::{Connector, Message, Simulation}; +use sim::utils::errors::SimulationError; -fn get_message_number(message: &str) -> &str { - message.split_whitespace().last().unwrap() +fn get_message_number(message: &str) -> Option<&str> { + message.split_whitespace().last() } #[test] -fn closure_under_coupling() { +fn closure_under_coupling() -> Result<(), SimulationError> { let atomic_models = vec![ Model::new( String::from("generator-01"), @@ -137,58 +138,58 @@ fn closure_under_coupling() { ] .iter() .enumerate() - .map(|(index, (models, connectors))| { - let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); - let message_records: Vec = simulation.step_n(1000).unwrap(); - let arrivals: Vec<(&f64, &str)>; - let departures: Vec<(&f64, &str)>; - match index { - 0 => { - arrivals = message_records - .iter() - .filter(|message_record| message_record.target_id() == "processor-01") - .map(|message_record| (message_record.time(), message_record.content())) - .collect(); - departures = message_records - .iter() - .filter(|message_record| message_record.target_id() == "storage-01") - .map(|message_record| (message_record.time(), message_record.content())) - .collect(); - } - _ => { - arrivals = message_records - .iter() - .filter(|message_record| message_record.target_id() == "storage-02") - .filter(|message_record| message_record.source_port() == "start") - .map(|message_record| (message_record.time(), message_record.content())) - .collect(); - departures = message_records - .iter() - .filter(|message_record| message_record.target_id() == "storage-02") - .filter(|message_record| message_record.source_port() == "stop") - .map(|message_record| (message_record.time(), message_record.content())) - .collect(); - } - } - let response_times: Vec = departures - .iter() - .map(|departure| { - departure.0 - - arrivals + .map( + |(index, (models, connectors))| -> Result, SimulationError> { + let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); + let message_records: Vec = simulation.step_n(1000)?; + let arrivals: Vec<(&f64, &str)>; + let departures: Vec<(&f64, &str)>; + match index { + 0 => { + arrivals = message_records + .iter() + .filter(|message_record| message_record.target_id() == "processor-01") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + departures = message_records .iter() - .find(|arrival| { - get_message_number(&arrival.1) == get_message_number(&departure.1) - }) - .unwrap() - .0 - }) - .collect(); - let mut response_times_sample = SteadyStateOutput::post(response_times); - response_times_sample - .confidence_interval_mean(0.001) - .unwrap() - }) - .collect(); + .filter(|message_record| message_record.target_id() == "storage-01") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + } + _ => { + arrivals = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-02") + .filter(|message_record| message_record.source_port() == "start") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + departures = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-02") + .filter(|message_record| message_record.source_port() == "stop") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + } + } + let response_times: Vec = departures + .iter() + .map(|departure| -> Result { + Ok(departure.0 + - arrivals + .iter() + .find(|arrival| { + get_message_number(&arrival.1) == get_message_number(&departure.1) + }) + .ok_or(SimulationError::DroppedMessageError)? + .0) + }) + .collect::, SimulationError>>()?; + let mut response_times_sample = SteadyStateOutput::post(response_times); + response_times_sample.confidence_interval_mean(0.001) + }, + ) + .collect::>, SimulationError>>()?; // Ensure confidence intervals overlap assert![ response_times_confidence_intervals[0].lower() @@ -198,4 +199,5 @@ fn closure_under_coupling() { response_times_confidence_intervals[1].lower() < response_times_confidence_intervals[0].upper() ]; + Ok(()) } diff --git a/sim/tests/custom.rs b/sim/tests/custom.rs index fb220db..d6f52e6 100644 --- a/sim/tests/custom.rs +++ b/sim/tests/custom.rs @@ -86,7 +86,7 @@ impl Reportable for Passive { impl ReportableModel for Passive {} #[test] -fn step_n_with_custom_passive_model() { +fn step_n_with_custom_passive_model() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -111,10 +111,11 @@ fn step_n_with_custom_passive_model() { )]; let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); // 1 initialization event, and 2 events per generation - let messages = simulation.step_n(9).unwrap(); + let messages = simulation.step_n(9)?; let generations_count = messages.len(); let expected = 4; // 4 interarrivals from 9 steps assert_eq!(generations_count, expected); + Ok(()) } #[test] diff --git a/sim/tests/simulations.rs b/sim/tests/simulations.rs index 91e4088..4279bf2 100644 --- a/sim/tests/simulations.rs +++ b/sim/tests/simulations.rs @@ -6,17 +6,18 @@ use sim::models::{ }; use sim::output_analysis::{IndependentSample, SteadyStateOutput}; use sim::simulator::{Connector, Message, Simulation}; +use sim::utils::errors::SimulationError; fn epsilon() -> f64 { 0.34 } -fn get_message_number(message: &str) -> &str { - message.split_whitespace().last().unwrap() +fn get_message_number(message: &str) -> Option<&str> { + message.split_whitespace().last() } #[test] -fn poisson_generator_processor_with_capacity() { +fn poisson_generator_processor_with_capacity() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -68,7 +69,7 @@ fn poisson_generator_processor_with_capacity() { // A stage for processed job collection let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); // Sample size will be reduced during output analysis - initialization bias reduction through deletion - let message_records: Vec = simulation.step_n(3000).unwrap(); + let message_records: Vec = simulation.step_n(3000)?; let departures: Vec<(&f64, &str)> = message_records .iter() .filter(|message_record| message_record.target_id() == "storage-01") @@ -82,45 +83,51 @@ fn poisson_generator_processor_with_capacity() { // Response Times let response_times: Vec = departures .iter() - .map(|departure| { - departure.0 + .map(|departure| -> Result { + Ok(departure.0 - arrivals .iter() .find(|arrival| { get_message_number(&arrival.1) == get_message_number(&departure.1) }) - .unwrap() - .0 + .ok_or(SimulationError::DroppedMessageError)? + .0) }) - .collect(); + .collect::, SimulationError>>()?; // Response times are not independent // Varying queue size leads to auto-correlation // To combat this, use steady state output analysis with deletion+batching let mut response_times_sample = SteadyStateOutput::post(response_times); - let response_times_confidence_interval = response_times_sample - .confidence_interval_mean(0.001) - .unwrap(); + let response_times_confidence_interval = + response_times_sample.confidence_interval_mean(0.001)?; // average number of jobs in the processor divided by the effective arrival rate (Little's Formula) let expected = (172285188.0 / 14316139.0) / (4766600.0 / 14316169.0); assert!(response_times_confidence_interval.lower() < expected); assert!(response_times_confidence_interval.upper() > expected); // Effective Arrival Rate - let last_processed_job = get_message_number(&departures.iter().last().unwrap().1); + let last_processed_job = get_message_number( + &departures + .iter() + .last() + .ok_or(SimulationError::DroppedMessageError)? + .1, + ); let count_generated = arrivals .iter() .position(|arrival| get_message_number(&arrival.1) == last_processed_job) - .unwrap() + .ok_or(SimulationError::DroppedMessageError)? + 1; let count_processed = departures.len(); // Effective arrival rate as the generated rate multiplied by the percent of jobs "served" (not ignored due to a full queue) let effective_arrival_rate = 0.5 * ((count_processed as f64) / (count_generated as f64)); let expected = 4766600.0 / 14316169.0; assert!((effective_arrival_rate - expected).abs() / expected < epsilon()); + Ok(()) } #[test] -fn step_until_activities() { +fn step_until_activities() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -155,20 +162,20 @@ fn step_until_activities() { // Refresh the models, but maintain the Uniform RNG for replication independence simulation.reset(); simulation.put(models.to_vec(), connectors.to_vec()); - let messages = simulation.step_until(100.0).unwrap(); + let messages = simulation.step_until(100.0)?; generations_count.push(messages.len() as f64); } - let generations_per_replication = IndependentSample::post(generations_count).unwrap(); - let generations_per_replication_ci = generations_per_replication - .confidence_interval_mean(0.001) - .unwrap(); + let generations_per_replication = IndependentSample::post(generations_count)?; + let generations_per_replication_ci = + generations_per_replication.confidence_interval_mean(0.001)?; let expected = 50.0; // 50 interarrivals - 1/0.5 mean and 100 duration assert!(generations_per_replication_ci.lower() < expected); assert!(generations_per_replication_ci.upper() > expected); + Ok(()) } #[test] -fn non_stationary_generation() { +fn non_stationary_generation() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -223,7 +230,7 @@ fn non_stationary_generation() { // Refresh the models, but maintain the Uniform RNG for replication independence simulation.reset(); simulation.put(models.to_vec(), connectors.to_vec()); - let messages = simulation.step_until(480.0).unwrap(); + let messages = simulation.step_until(480.0)?; let arrivals: Vec<&Message> = messages .iter() .filter(|message| message.target_id() == "processor-01") @@ -231,21 +238,19 @@ fn non_stationary_generation() { arrivals_count.push(arrivals.len() as f64); message_records.extend(messages); } - let arrivals_ci = IndependentSample::post(arrivals_count) - .unwrap() - .confidence_interval_mean(0.05) - .unwrap(); + let arrivals_ci = IndependentSample::post(arrivals_count)?.confidence_interval_mean(0.05)?; // Confirm empirical CI and simulation output CI overlap - let empirical_arrivals = IndependentSample::post(vec![47.0, 42.0, 45.0, 34.0, 37.0]).unwrap(); - let empirical_arrivals_ci = empirical_arrivals.confidence_interval_mean(0.05).unwrap(); + let empirical_arrivals_ci = IndependentSample::post(vec![47.0, 42.0, 45.0, 34.0, 37.0])? + .confidence_interval_mean(0.05)?; assert!( arrivals_ci.lower() < empirical_arrivals_ci.upper() && arrivals_ci.upper() > empirical_arrivals_ci.lower() ); + Ok(()) } #[test] -fn exclusive_gateway_proportions_chi_square() { +fn exclusive_gateway_proportions_chi_square() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -334,7 +339,7 @@ fn exclusive_gateway_proportions_chi_square() { // 601 steps means 200 processed jobs (3 steps per gateway passthrough) // 1 initialization step for _x in 0..601 { - let messages_set: Vec = simulation.step().unwrap(); + let messages_set: Vec = simulation.step()?; message_records.extend(messages_set); } let outputs = vec![ @@ -363,10 +368,11 @@ fn exclusive_gateway_proportions_chi_square() { // 3 bins, 2 dof, 0.01 alpha let chi_square_critical = 9.21; assert![chi_square < chi_square_critical]; + Ok(()) } #[test] -fn gate_blocking_proportions() { +fn gate_blocking_proportions() -> Result<(), SimulationError> { // Deactivation/activation switch at a much higher frequency than job arrival, to limit autocorrelation and initialization bias let models = [ Model::new( @@ -455,7 +461,7 @@ fn gate_blocking_proportions() { simulation.put(models.to_vec(), connectors.to_vec()); let mut message_records: Vec = Vec::new(); for _x in 0..1000 { - let messages_set: Vec = simulation.step().unwrap(); + let messages_set: Vec = simulation.step()?; message_records.extend(messages_set); } let arrivals = message_records @@ -473,16 +479,14 @@ fn gate_blocking_proportions() { passed.push(departures as f64 / arrivals as f64); } } - let passed_ci = IndependentSample::post(passed) - .unwrap() - .confidence_interval_mean(0.01) - .unwrap(); + let passed_ci = IndependentSample::post(passed)?.confidence_interval_mean(0.01)?; // With no "processing" delay for the gate, we can expect the blocked/unblocked proportions to be 50% assert![passed_ci.lower() < 0.5 && 0.5 < passed_ci.upper()]; + Ok(()) } #[test] -fn load_balancer_round_robin_outputs() { +fn load_balancer_round_robin_outputs() -> Result<(), SimulationError> { // Deactivation/activation switch at a much higher frequency than job arrival, to limit autocorrelation and initialization bias let models = [ Model::new( @@ -568,7 +572,7 @@ fn load_balancer_round_robin_outputs() { // 28 steps means 9 processed jobs // 3 steps per processed job // 1 step for initialization - let message_records: Vec = simulation.step_n(28).unwrap(); + let message_records: Vec = simulation.step_n(28)?; let outputs = vec![ message_records .iter() @@ -586,10 +590,11 @@ fn load_balancer_round_robin_outputs() { outputs.iter().for_each(|server_arrival_count| { assert_eq![*server_arrival_count, 3]; }); + Ok(()) } #[test] -fn injection_initiated_stored_value_exchange() { +fn injection_initiated_stored_value_exchange() -> Result<(), SimulationError> { let models = [ Model::new( String::from("storage-01"), @@ -636,7 +641,7 @@ fn injection_initiated_stored_value_exchange() { String::from("42"), ); simulation.inject_input(stored_value); - simulation.step().unwrap(); + simulation.step()?; let transfer_request = Message::new( String::from("manual"), String::from("manual"), @@ -646,7 +651,7 @@ fn injection_initiated_stored_value_exchange() { String::from(""), ); simulation.inject_input(transfer_request); - simulation.step().unwrap(); + simulation.step()?; let read_request = Message::new( String::from("manual"), String::from("manual"), @@ -656,12 +661,13 @@ fn injection_initiated_stored_value_exchange() { String::from(""), ); simulation.inject_input(read_request); - let messages: Vec = simulation.step().unwrap(); + let messages: Vec = simulation.step()?; assert_eq![messages[0].content(), "42"]; + Ok(()) } #[test] -fn parallel_gateway_splits_and_joins() { +fn parallel_gateway_splits_and_joins() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -744,7 +750,7 @@ fn parallel_gateway_splits_and_joins() { ), ]; let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); - let message_records: Vec = simulation.step_n(101).unwrap(); + let message_records: Vec = simulation.step_n(101)?; let alpha_passes = message_records .iter() .filter(|message_record| message_record.target_port() == "alpha") @@ -765,10 +771,11 @@ fn parallel_gateway_splits_and_joins() { assert_eq![beta_passes, delta_passes]; assert_eq![delta_passes, storage_passes]; assert![storage_passes > 0]; + Ok(()) } #[test] -fn match_status_reporting() { +fn match_status_reporting() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -794,18 +801,16 @@ fn match_status_reporting() { ]; let connectors = []; let simulation = Simulation::post(models.to_vec(), connectors.to_vec()); + assert_eq![simulation.get_status("generator-01")?, "Generating jobs"]; assert_eq![ - simulation.get_status("generator-01").unwrap(), - "Generating jobs" - ]; - assert_eq![ - simulation.get_status("load-balancer-01").unwrap(), + simulation.get_status("load-balancer-01")?, "Listening for requests" ]; + Ok(()) } #[test] -fn stochastic_gate_blocking() { +fn stochastic_gate_blocking() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -852,7 +857,7 @@ fn stochastic_gate_blocking() { ), ]; let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); - let message_records: Vec = simulation.step_n(101).unwrap(); + let message_records: Vec = simulation.step_n(101)?; let mut results: Vec = Vec::new(); message_records .iter() @@ -866,13 +871,14 @@ fn stochastic_gate_blocking() { message_record.target_id() == "stochastic-gate-01" && *index > passes }) .for_each(|_fail| results.push(0.0)); - let sample = IndependentSample::post(results).unwrap(); - assert![sample.confidence_interval_mean(0.01).unwrap().lower() < 0.2]; - assert![sample.confidence_interval_mean(0.01).unwrap().upper() > 0.2]; + let sample = IndependentSample::post(results)?; + assert![sample.confidence_interval_mean(0.01)?.lower() < 0.2]; + assert![sample.confidence_interval_mean(0.01)?.upper() > 0.2]; + Ok(()) } #[test] -fn batch_sizing() { +fn batch_sizing() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -922,7 +928,7 @@ fn batch_sizing() { let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); let mut batch_sizes: Vec = Vec::new(); for _ in 0..10000 { - let message_records: Vec = simulation.step().unwrap(); + let message_records: Vec = simulation.step()?; let batch_size = message_records .iter() .filter(|message_record| message_record.target_id() == "storage-01") @@ -938,10 +944,11 @@ fn batch_sizing() { assert![exists_partial_batch]; assert![exists_full_batch]; assert![!exists_oversized_batch]; + Ok(()) } #[test] -fn min_and_max_stopwatch() { +fn min_and_max_stopwatch() -> Result<(), SimulationError> { let models = [ Model::new( String::from("generator-01"), @@ -1053,7 +1060,7 @@ fn min_and_max_stopwatch() { ), ]; let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); - simulation.step_n(12).unwrap(); + simulation.step_n(12)?; let minimum_fetch = Message::new( String::from("manual"), String::from("manual"), @@ -1072,7 +1079,8 @@ fn min_and_max_stopwatch() { String::from("42"), ); simulation.inject_input(maximum_fetch); - let responses = simulation.step_n(2).unwrap(); + let responses = simulation.step_n(2)?; // Assert the minimum duration job and maximum duration job are not the same assert![responses[0].content() != responses[1].content()]; + Ok(()) }