From 6a162b56d597d687b9ab7d5e85b7a856d567a45d Mon Sep 17 00:00:00 2001 From: Neal DeBuhr Date: Sat, 26 Feb 2022 20:16:51 +0000 Subject: [PATCH] Add a coupled model for hierarchical model construction --- .github/workflows/ci.yml | 7 +- sim/src/models/coupled.rs | 290 ++++++++++++++++++++++++++++++++++++++ sim/src/models/mod.rs | 4 + sim/tests/coupled.rs | 200 ++++++++++++++++++++++++++ 4 files changed, 499 insertions(+), 2 deletions(-) create mode 100644 sim/src/models/coupled.rs create mode 100644 sim/tests/coupled.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index aa5681a..6eb4ea5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,9 +29,12 @@ jobs: ~/.cargo/git ./sim/target key: ${{ runner.os }}-cargo-${{ matrix.rust }}-${{ hashFiles('./sim/Cargo.lock') }} - - name: Run Tests + - name: Run Tests (No Optional Features) working-directory: ./sim - run: cargo test --features simx,console_error_panic_hook -- --nocapture + run: cargo test -- --nocapture + - name: Run Tests (All Optional Features) + working-directory: ./sim + run: cargo test --all-features -- --nocapture wasm-pack: name: Test (wasm) diff --git a/sim/src/models/coupled.rs b/sim/src/models/coupled.rs new file mode 100644 index 0000000..1b5c648 --- /dev/null +++ b/sim/src/models/coupled.rs @@ -0,0 +1,290 @@ +use std::f64::INFINITY; + +use serde::{Deserialize, Serialize}; + +use super::model_trait::{DevsModel, Reportable, ReportableModel, SerializableModel}; +use super::{Model, ModelMessage, ModelRecord}; + +use crate::simulator::Services; +use crate::utils::errors::SimulationError; + +use sim_derive::SerializableModel; + +#[derive(Clone, Deserialize, Serialize, SerializableModel)] +#[serde(rename_all = "camelCase")] +pub struct Coupled { + ports_in: PortsIn, + ports_out: PortsOut, + components: Vec, + external_input_couplings: Vec, + external_output_couplings: Vec, + internal_couplings: Vec, + #[serde(default)] + state: State, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PortsIn { + flow_paths: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PortsOut { + flow_paths: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExternalInputCoupling { + #[serde(rename = "targetID")] + pub target_id: String, + pub source_port: String, + pub target_port: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ExternalOutputCoupling { + #[serde(rename = "sourceID")] + pub source_id: String, + pub source_port: String, + pub target_port: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InternalCoupling { + #[serde(rename = "sourceID")] + pub source_id: String, + #[serde(rename = "targetID")] + pub target_id: String, + pub source_port: String, + pub target_port: String, +} + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct State { + parked_messages: Vec, + records: Vec, +} + +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ParkedMessage { + component_id: String, + port: String, + content: String, +} + +impl Coupled { + pub fn new( + ports_in: Vec, + ports_out: Vec, + components: Vec, + external_input_couplings: Vec, + external_output_couplings: Vec, + internal_couplings: Vec, + ) -> Self { + Self { + ports_in: PortsIn { + flow_paths: ports_in, + }, + ports_out: PortsOut { + flow_paths: ports_out, + }, + components, + external_input_couplings, + external_output_couplings, + internal_couplings, + state: State::default(), + } + } + + fn external_targets(&self, source_id: &str, source_port: &str) -> Vec { + // Vec + + self.external_output_couplings + .iter() + .filter_map(|coupling| { + if coupling.source_id == source_id && coupling.source_port == source_port { + Some(coupling.target_port.to_string()) + } else { + None + } + }) + .collect() + } + + fn internal_targets(&self, source_id: &str, source_port: &str) -> Vec<(String, String)> { + // Vec<(target_id, target_port)> + + self.internal_couplings + .iter() + .filter_map(|coupling| { + if coupling.source_id == source_id && coupling.source_port == source_port { + Some(( + coupling.target_id.to_string(), + coupling.target_port.to_string(), + )) + } else { + None + } + }) + .collect() + } +} + +impl DevsModel for Coupled { + fn events_ext( + &mut self, + incoming_message: &ModelMessage, + services: &mut Services, + ) -> Result<(), SimulationError> { + let external_input_couplings = self.external_input_couplings.clone(); + self.components + .iter_mut() + .filter_map(|component| { + external_input_couplings + .iter() + .filter(|coupling| coupling.source_port == incoming_message.port_name) + .map(|coupling| (&coupling.target_id, &coupling.target_port)) + .find_map(|(target_id, target_port)| { + if target_id == component.id() { + Some(component.events_ext( + &ModelMessage { + port_name: target_port.to_string(), + content: incoming_message.content.clone(), + }, + services, + )) + } else { + None + } + }) + }) + .collect() + } + + fn events_int( + &mut self, + services: &mut Services, + ) -> Result, SimulationError> { + // Find the (internal message) events_ext relevant models (parked message id == component id) + let ext_transitioning_component_triggers: Vec<(usize, String, String)> = (0..self + .components + .len()) + .map(|component_index| -> Vec<(usize, String, String)> { + self.state + .parked_messages + .iter() + .filter_map(|parked_message| { + if parked_message.component_id == self.components[component_index].id() { + Some(( + component_index, + parked_message.port.to_string(), + parked_message.content.to_string(), + )) + } else { + None + } + }) + .collect() + }) + .flatten() + .collect(); + ext_transitioning_component_triggers + .iter() + .map( + |(component_index, message_port, message_content)| -> Result<(), SimulationError> { + self.components[*component_index].events_ext( + &ModelMessage { + port_name: message_port.to_string(), + content: message_content.to_string(), + }, + services, + ) + }, + ) + .collect::, SimulationError>>()?; + self.state.parked_messages = Vec::new(); + // Find the events_int relevant models (until_next_event == 0.0) + // Run events_int for each model, and compile the internal and external messages + // Store the internal messages in the Coupled model struct, and output the external messages + let int_transitioning_component_indexes: Vec = (0..self.components.len()) + .filter(|component_index| self.components[*component_index].until_next_event() == 0.0) + .collect(); + Ok(int_transitioning_component_indexes + .iter() + .map( + |component_index| -> Result, SimulationError> { + Ok(self.components[*component_index] + .events_int(services)? + .iter() + .map(|outgoing_message| -> Vec { + // For internal messages (those transmitted on internal couplings), store the messages + // as Parked Messages, to be ingested by the target components on the next simulation step + self.internal_targets( + self.components[*component_index].id(), + &outgoing_message.port_name, + ) + .iter() + .for_each(|(target_id, target_port)| { + self.state.parked_messages.push(ParkedMessage { + component_id: target_id.to_string(), + port: target_port.to_string(), + content: outgoing_message.content.clone(), + }); + }); + // For external messages (those transmitted on external output couplings), prepare the + // output as standard events_int output + self.external_targets( + self.components[*component_index].id(), + &outgoing_message.port_name, + ) + .iter() + .map(|target_port| ModelMessage { + port_name: target_port.to_string(), + content: outgoing_message.content.clone(), + }) + .collect() + }) + .flatten() + .collect()) + }, + ) + .flatten() + .flatten() + .collect()) + } + + fn time_advance(&mut self, time_delta: f64) { + self.components.iter_mut().for_each(|component| { + component.time_advance(time_delta); + }); + } + + fn until_next_event(&self) -> f64 { + self.components.iter().fold(INFINITY, |min, component| { + f64::min(min, component.until_next_event()) + }) + } +} + +impl Reportable for Coupled { + fn status(&self) -> String { + if self.state.parked_messages.is_empty() { + format!["Processing {} messages", self.state.parked_messages.len()] + } else { + String::from("Processing no messages") + } + } + + fn records(&self) -> &Vec { + &self.state.records + } +} + +impl ReportableModel for Coupled {} diff --git a/sim/src/models/mod.rs b/sim/src/models/mod.rs index 5af6640..9bff445 100644 --- a/sim/src/models/mod.rs +++ b/sim/src/models/mod.rs @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize}; pub mod batcher; +#[cfg(not(feature = "simx"))] +pub mod coupled; pub mod exclusive_gateway; pub mod gate; pub mod generator; @@ -22,6 +24,8 @@ pub mod model_repr; pub mod model_trait; pub use self::batcher::Batcher; +#[cfg(not(feature = "simx"))] +pub use self::coupled::{Coupled, ExternalInputCoupling, ExternalOutputCoupling, InternalCoupling}; pub use self::exclusive_gateway::ExclusiveGateway; pub use self::gate::Gate; pub use self::generator::Generator; diff --git a/sim/tests/coupled.rs b/sim/tests/coupled.rs new file mode 100644 index 0000000..033de1b --- /dev/null +++ b/sim/tests/coupled.rs @@ -0,0 +1,200 @@ +#[cfg(not(feature = "simx"))] +#[test] +fn closure_under_coupling() { + use sim::input_modeling::ContinuousRandomVariable; + use sim::models::*; + use sim::output_analysis::*; + use sim::simulator::*; + + fn get_message_number(message: &str) -> &str { + message.split_whitespace().last().unwrap() + } + + let atomic_models = vec![ + Model::new( + String::from("generator-01"), + Box::new(Generator::new( + ContinuousRandomVariable::Exp { lambda: 0.007 }, + None, + String::from("job"), + false, + )), + ), + Model::new( + String::from("processor-01"), + Box::new(Processor::new( + ContinuousRandomVariable::Exp { lambda: 0.011 }, + Some(14), + String::from("job"), + String::from("processed"), + false, + )), + ), + Model::new( + String::from("storage-01"), + Box::new(Storage::new( + String::from("store"), + String::from("read"), + String::from("stored"), + false, + )), + ), + ]; + let atomic_connectors = vec![ + Connector::new( + String::from("connector-01"), + String::from("generator-01"), + String::from("processor-01"), + String::from("job"), + String::from("job"), + ), + Connector::new( + String::from("connector-02"), + String::from("processor-01"), + String::from("storage-01"), + String::from("processed"), + String::from("store"), + ), + ]; + let coupled_models = vec![ + Model::new( + String::from("coupled-01"), + Box::new(Coupled::new( + Vec::new(), + vec![String::from("start"), String::from("stop")], + vec![ + Model::new( + String::from("generator-01"), + Box::new(Generator::new( + ContinuousRandomVariable::Exp { lambda: 0.007 }, + None, + String::from("job"), + false, + )), + ), + Model::new( + String::from("processor-01"), + Box::new(Processor::new( + ContinuousRandomVariable::Exp { lambda: 0.011 }, + Some(14), + String::from("job"), + String::from("processed"), + false, + )), + ), + ], + Vec::new(), + vec![ + ExternalOutputCoupling { + source_id: String::from("generator-01"), + source_port: String::from("job"), + target_port: String::from("start"), + }, + ExternalOutputCoupling { + source_id: String::from("processor-01"), + source_port: String::from("processed"), + target_port: String::from("stop"), + }, + ], + vec![InternalCoupling { + source_id: String::from("generator-01"), + target_id: String::from("processor-01"), + source_port: String::from("job"), + target_port: String::from("job"), + }], + )), + ), + Model::new( + String::from("storage-02"), + Box::new(Storage::new( + String::from("store"), + String::from("read"), + String::from("stored"), + false, + )), + ), + ]; + let coupled_connectors = vec![ + Connector::new( + String::from("connector-01"), + String::from("coupled-01"), + String::from("storage-02"), + String::from("start"), + String::from("store"), + ), + Connector::new( + String::from("connector-02"), + String::from("coupled-01"), + String::from("storage-02"), + String::from("stop"), + String::from("store"), + ), + ]; + let response_times_confidence_intervals: Vec> = [ + (atomic_models, atomic_connectors), + (coupled_models, coupled_connectors), + ] + .iter() + .enumerate() + .map(|(index, (models, connectors))| { + let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); + let message_records: Vec = simulation.step_n(1000).unwrap(); + let arrivals: Vec<(&f64, &str)>; + let departures: Vec<(&f64, &str)>; + match index { + 0 => { + arrivals = message_records + .iter() + .filter(|message_record| message_record.target_id() == "processor-01") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + departures = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-01") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + } + _ => { + arrivals = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-02") + .filter(|message_record| message_record.source_port() == "start") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + departures = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-02") + .filter(|message_record| message_record.source_port() == "stop") + .map(|message_record| (message_record.time(), message_record.content())) + .collect(); + } + } + let response_times: Vec = departures + .iter() + .map(|departure| { + departure.0 + - arrivals + .iter() + .find(|arrival| { + get_message_number(&arrival.1) == get_message_number(&departure.1) + }) + .unwrap() + .0 + }) + .collect(); + let mut response_times_sample = SteadyStateOutput::post(response_times); + response_times_sample + .confidence_interval_mean(0.001) + .unwrap() + }) + .collect(); + // Ensure confidence intervals overlap + assert![ + response_times_confidence_intervals[0].lower() + < response_times_confidence_intervals[1].upper() + ]; + assert![ + response_times_confidence_intervals[1].lower() + < response_times_confidence_intervals[0].upper() + ]; +}