diff --git a/sim/src/models/parallel_gateway.rs b/sim/src/models/parallel_gateway.rs index 02f3bed..2457754 100644 --- a/sim/src/models/parallel_gateway.rs +++ b/sim/src/models/parallel_gateway.rs @@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize}; use super::model_trait::{AsModel, SerializableModel}; use super::ModelMessage; use crate::simulator::Services; +use crate::utils::default_records_port_name; use crate::utils::error::SimulationError; -use crate::utils::{populate_history_port, populate_snapshot_port}; use sim_derive::SerializableModel; @@ -21,109 +21,181 @@ pub struct ParallelGateway { ports_in: PortsIn, ports_out: PortsOut, #[serde(default)] - state: State, - #[serde(default)] - snapshot: Metrics, + store_records: bool, #[serde(default)] - history: Vec, + state: State, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct PortsIn { flow_paths: Vec, - snapshot: Option, - history: Option, + #[serde(default = "default_records_port_name")] + records: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct PortsOut { flow_paths: Vec, - snapshot: Option, - history: Option, + #[serde(default = "default_records_port_name")] + records: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct State { - event_list: Vec, + phase: Phase, + until_next_event: f64, collections: HashMap, + records: Vec, } impl Default for State { fn default() -> Self { - let initalization_event = ScheduledEvent { - time: 0.0, - event: Event::Run, - }; - State { - event_list: vec![initalization_event], + Self { + phase: Phase::Processing, + until_next_event: 0.0, collections: HashMap::new(), + records: Vec::new(), } } } -#[derive(Debug, Clone, Serialize, Deserialize)] -enum Event { - Run, - SendJob, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct ScheduledEvent { - time: f64, - event: Event, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +enum Phase { + Processing, + RecordsFetch, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -struct Metrics { - last_arrival: Option<(String, f64)>, - last_departure: Option<(String, f64)>, -} - -impl Default for Metrics { - fn default() -> Self { - Metrics { - last_arrival: None, - last_departure: None, - } - } +struct Record { + content: String, + time: f64, } impl ParallelGateway { pub fn new( flow_paths_in: Vec, flow_paths_out: Vec, - snapshot_metrics: bool, - history_metrics: bool, + store_records: bool, ) -> Self { Self { ports_in: PortsIn { flow_paths: flow_paths_in, - snapshot: populate_snapshot_port(snapshot_metrics), - history: populate_history_port(history_metrics), + records: default_records_port_name(), }, ports_out: PortsOut { flow_paths: flow_paths_out, - snapshot: populate_snapshot_port(snapshot_metrics), - history: populate_history_port(history_metrics), + records: default_records_port_name(), }, + store_records, state: Default::default(), - snapshot: Default::default(), - history: Default::default(), } } - fn need_snapshot_metrics(&self) -> bool { - self.ports_in.snapshot.is_some() && self.ports_out.snapshot.is_some() + fn full_collection(&self) -> Option<(&String, &usize)> { + self.state + .collections + .iter() + .find(|(_, count)| **count == self.ports_in.flow_paths.len()) + } + + fn increment_collection( + &mut self, + incoming_message: &ModelMessage, + _services: &mut Services, + ) -> Result<(), SimulationError> { + *self + .state + .collections + .entry(incoming_message.content.clone()) + .or_insert(0) += 1; + self.state.until_next_event = 0.0; + Ok(()) } - fn need_historical_metrics(&self) -> bool { - self.need_snapshot_metrics() - && self.ports_in.history.is_some() - && self.ports_out.history.is_some() + fn request_records( + &mut self, + _incoming_message: &ModelMessage, + _services: &mut Services, + ) -> Result<(), SimulationError> { + self.state.phase = Phase::RecordsFetch; + self.state.until_next_event = 0.0; + Ok(()) + } + + fn ignore_request( + &mut self, + _incoming_message: &ModelMessage, + _services: &mut Services, + ) -> Result<(), SimulationError> { + Ok(()) + } + + fn release_records(&mut self) -> Result, SimulationError> { + self.state.phase = Phase::Processing; + self.state.until_next_event = 0.0; + Ok(vec![ModelMessage { + port_name: self.ports_out.records.clone(), + content: serde_json::to_string(&self.state.records).unwrap(), + }]) + } + + fn send_and_save_job( + &mut self, + services: &mut Services, + ) -> Result, SimulationError> { + self.state.until_next_event = 0.0; + let completed_collection = self + .full_collection() + .ok_or(SimulationError::InvalidModelState)? + .0 + .to_string(); + self.state.collections.remove(&completed_collection); + self.state.records.push(Record { + content: completed_collection.clone(), + time: services.global_time(), + }); + Ok(self + .ports_out + .flow_paths + .iter() + .fold(Vec::new(), |mut messages, flow_path| { + messages.push(ModelMessage { + port_name: flow_path.clone(), + content: completed_collection.clone(), + }); + messages + })) + } + + fn send_job(&mut self) -> Result, SimulationError> { + self.state.until_next_event = 0.0; + let completed_collection = self + .full_collection() + .ok_or(SimulationError::InvalidModelState)? + .0 + .to_string(); + self.state.collections.remove(&completed_collection); + Ok(self + .ports_out + .flow_paths + .iter() + .fold(Vec::new(), |mut messages, flow_path| { + messages.push(ModelMessage { + port_name: flow_path.clone(), + content: completed_collection.clone(), + }); + messages + })) + } + + fn passivate(&mut self) -> Result, SimulationError> { + self.state.phase = Phase::Processing; + self.state.until_next_event = INFINITY; + Ok(Vec::new()) } } @@ -137,26 +209,14 @@ impl AsModel for ParallelGateway { incoming_message: &ModelMessage, services: &mut Services, ) -> Result, SimulationError> { - // Possible metrics updates - if self.need_snapshot_metrics() { - self.snapshot.last_arrival = - Some((incoming_message.content.clone(), services.global_time())); - } - if self.need_historical_metrics() { - self.history.push(self.snapshot.clone()); - } - // State changes - let matching_collection = self - .state - .collections - .entry(incoming_message.content.clone()) - .or_insert(0); - *matching_collection += 1; - if *matching_collection == self.ports_in.flow_paths.len() { - self.state.event_list.push(ScheduledEvent { - time: 0.0, - event: Event::SendJob, - }) + if incoming_message.port_name == self.ports_in.records && self.store_records { + self.request_records(incoming_message, services)?; + } else if incoming_message.port_name == self.ports_in.records && !self.store_records { + self.ignore_request(incoming_message, services)?; + } else if incoming_message.port_name != self.ports_in.records { + self.increment_collection(incoming_message, services)?; + } else { + return Err(SimulationError::InvalidModelState); } Ok(Vec::new()) } @@ -165,68 +225,30 @@ impl AsModel for ParallelGateway { &mut self, services: &mut Services, ) -> Result, SimulationError> { - let mut outgoing_messages: Vec = Vec::new(); - let events = self.state.event_list.clone(); - self.state.event_list = self - .state - .event_list - .iter() - .filter(|scheduled_event| scheduled_event.time != 0.0) - .cloned() - .collect(); - events - .iter() - .filter(|scheduled_event| scheduled_event.time == 0.0) - .map(|scheduled_event| { - match scheduled_event.event { - Event::Run => {} - Event::SendJob => { - let completed_collection = self - .state - .collections - .iter() - .find(|(_, count)| **count == self.ports_in.flow_paths.len()) - .ok_or(SimulationError::InvalidModelState)? - .0 - .to_string(); - self.ports_out.flow_paths.iter().for_each(|port_out| { - outgoing_messages.push(ModelMessage { - port_name: String::from(port_out), - content: completed_collection.clone(), - }); - }); - self.state.collections.remove(&completed_collection); - // Possible metrics updates - if self.need_snapshot_metrics() { - self.snapshot.last_departure = - Some((completed_collection, services.global_time())); - } - if self.need_historical_metrics() { - self.history.push(self.snapshot.clone()); - } - } - } - Ok(Vec::new()) - }) - .find(|result| result.is_err()) - .unwrap_or(Ok(outgoing_messages)) + if self.state.phase == Phase::RecordsFetch { + self.release_records() + } else if self.state.phase == Phase::Processing + && self.store_records + && self.full_collection().is_some() + { + self.send_and_save_job(services) + } else if self.state.phase == Phase::Processing + && !self.store_records + && self.full_collection().is_some() + { + self.send_job() + } else if self.state.phase == Phase::Processing && self.full_collection().is_none() { + self.passivate() + } else { + Err(SimulationError::InvalidModelState) + } } fn time_advance(&mut self, time_delta: f64) { - self.state - .event_list - .iter_mut() - .for_each(|scheduled_event| { - scheduled_event.time -= time_delta; - }); + self.state.until_next_event -= time_delta; } fn until_next_event(&self) -> f64 { - self.state - .event_list - .iter() - .fold(INFINITY, |until_next_event, event| { - f64::min(until_next_event, event.time) - }) + self.state.until_next_event } } diff --git a/sim/tests/simulations.rs b/sim/tests/simulations.rs index 6c3e09e..9ef56da 100644 --- a/sim/tests/simulations.rs +++ b/sim/tests/simulations.rs @@ -692,7 +692,6 @@ fn parallel_gateway_splits_and_joins() { String::from("delta"), ], false, - false, )), ), Model::new( @@ -705,7 +704,6 @@ fn parallel_gateway_splits_and_joins() { ], vec![String::from("out")], false, - false, )), ), Model::new(