From 331ff1558b195a4545496a1632a3e2df07bdc5f8 Mon Sep 17 00:00:00 2001 From: ndebuhr Date: Sat, 22 May 2021 16:10:50 +0000 Subject: [PATCH] Add a Batcher model --- src/models/batcher.rs | 176 ++++++++++++++++++++++++++++++++++++++++++ src/models/mod.rs | 2 + tests/simulations.rs | 70 +++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100644 src/models/batcher.rs diff --git a/src/models/batcher.rs b/src/models/batcher.rs new file mode 100644 index 0000000..33e95a3 --- /dev/null +++ b/src/models/batcher.rs @@ -0,0 +1,176 @@ +use std::f64::INFINITY; + +use serde::{Deserialize, Serialize}; + +use super::model_trait::{AsModel, SerializableModel}; +use super::ModelMessage; +use crate::simulator::Services; +use crate::utils::error::SimulationError; + +use sim_derive::SerializableModel; + +/// The batching process begins when the batcher receives a job. It will +/// then accept additional jobs, adding them to a batch with the first job, +/// until a max batching time or max batch size is reached - whichever comes +/// first. If the simultaneous arrival of multiple jobs causes the max batch +/// size to be exceeded, then the excess jobs will spillover into the next +/// batching period. In this case of excess jobs, the next batching period +/// begins immediately after the release of the preceding batch. If there +/// are no excess jobs, the batcher will become passive, and wait for a job +/// arrival to initiate the batching process. +#[derive(Debug, Clone, Deserialize, Serialize, SerializableModel)] +#[serde(rename_all = "camelCase")] +pub struct Batcher { + ports_in: PortsIn, + ports_out: PortsOut, + max_batch_time: f64, + max_batch_size: usize, + #[serde(default)] + state: State, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PortsIn { + job: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PortsOut { + job: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct State { + phase: Phase, + until_next_event: f64, + jobs: Vec, +} + +impl Default for State { + fn default() -> Self { + State { + phase: Phase::Passive, + until_next_event: INFINITY, + jobs: Vec::new(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +enum Phase { + Passive, // Doing nothing + Batching, // Building a batch + Release, // Releasing a batch +} + +impl Batcher { + pub fn new( + job_in_port: String, + job_out_port: String, + max_batch_time: f64, + max_batch_size: usize, + ) -> Self { + Self { + ports_in: PortsIn { job: job_in_port }, + ports_out: PortsOut { job: job_out_port }, + max_batch_time, + max_batch_size, + state: Default::default(), + } + } + + fn add_to_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> { + self.state.phase = Phase::Batching; + self.state.jobs.push(incoming_message.content.clone()); + Ok(()) + } + + fn start_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> { + self.state.phase = Phase::Batching; + self.state.until_next_event = self.max_batch_time; + self.state.jobs.push(incoming_message.content.clone()); + Ok(()) + } + + fn fill_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> { + self.state.phase = Phase::Release; + self.state.until_next_event = 0.0; + self.state.jobs.push(incoming_message.content.clone()); + Ok(()) + } + + fn release_full_queue(&mut self) -> Result, SimulationError> { + self.state.phase = Phase::Passive; + self.state.until_next_event = INFINITY; + Ok((0..self.state.jobs.len()) + .map(|_| ModelMessage { + port_name: self.ports_out.job.clone(), + content: self.state.jobs.remove(0), + }) + .collect()) + } + + fn release_partial_queue(&mut self) -> Result, SimulationError> { + self.state.phase = Phase::Batching; + self.state.until_next_event = self.max_batch_time; + Ok((0..self.max_batch_size) + .map(|_| ModelMessage { + port_name: self.ports_out.job.clone(), + content: self.state.jobs.remove(0), + }) + .collect()) + } +} + +impl AsModel for Batcher { + fn status(&self) -> String { + match self.state.phase { + Phase::Passive => String::from("Passive"), + Phase::Batching => String::from("Creating batch"), + Phase::Release => String::from("Releasing batch"), + } + } + + fn events_ext( + &mut self, + incoming_message: &ModelMessage, + _services: &mut Services, + ) -> Result, SimulationError> { + if self.state.phase == Phase::Batching && self.state.jobs.len() + 1 < self.max_batch_size { + self.add_to_batch(incoming_message)?; + } else if self.state.phase == Phase::Passive + && self.state.jobs.len() + 1 < self.max_batch_size + { + self.start_batch(incoming_message)?; + } else if self.state.jobs.len() + 1 >= self.max_batch_size { + self.fill_batch(incoming_message)?; + } else { + return Err(SimulationError::InvalidModelState); + } + Ok(Vec::new()) + } + + fn events_int( + &mut self, + _services: &mut Services, + ) -> Result, SimulationError> { + if self.state.jobs.len() <= self.max_batch_size { + self.release_full_queue() + } else if self.state.jobs.len() <= self.max_batch_size { + self.release_partial_queue() + } else { + Err(SimulationError::InvalidModelState) + } + } + + fn time_advance(&mut self, time_delta: f64) { + self.state.until_next_event -= time_delta; + } + + fn until_next_event(&self) -> f64 { + self.state.until_next_event + } +} diff --git a/src/models/mod.rs b/src/models/mod.rs index e55fb93..dffa859 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -3,6 +3,7 @@ //! specifies the requirements of any additional custom models, via the //! `Model` trait. +pub mod batcher; pub mod exclusive_gateway; pub mod gate; pub mod generator; @@ -17,6 +18,7 @@ pub mod model_factory; pub mod model_repr; pub mod model_trait; +pub use self::batcher::Batcher; pub use self::exclusive_gateway::ExclusiveGateway; pub use self::gate::Gate; pub use self::generator::Generator; diff --git a/tests/simulations.rs b/tests/simulations.rs index 51103db..8d05f1b 100644 --- a/tests/simulations.rs +++ b/tests/simulations.rs @@ -898,3 +898,73 @@ fn stochastic_gate_blocking() { assert![sample.confidence_interval_mean(0.01).unwrap().lower() < 0.2]; assert![sample.confidence_interval_mean(0.01).unwrap().upper() > 0.2]; } + +#[test] +fn batch_sizing() { + let models = [ + Model::new( + String::from("generator-01"), + Box::new(Generator::new( + ContinuousRandomVariable::Exp { lambda: 1.0 }, + None, + String::from("job"), + false, + false, + )), + ), + Model::new( + String::from("batcher-01"), + Box::new(Batcher::new( + String::from("job"), + String::from("job"), + 10.0, // 10 seconds max batching time + 10, // 10 jobs max batch size + )), + ), + Model::new( + String::from("storage-01"), + Box::new(Storage::new( + String::from("store"), + String::from("read"), + String::from("stored"), + false, + false, + )), + ), + ]; + let connectors = [ + Connector::new( + String::from("connector-01"), + String::from("generator-01"), + String::from("batcher-01"), + String::from("job"), + String::from("job"), + ), + Connector::new( + String::from("connector-02"), + String::from("batcher-01"), + String::from("storage-01"), + String::from("job"), + String::from("store"), + ), + ]; + let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec()); + let mut batch_sizes: Vec = Vec::new(); + for _ in 0..10000 { + let message_records: Vec = simulation.step().unwrap(); + let batch_size = message_records + .iter() + .filter(|message_record| message_record.target_id() == "storage-01") + .count(); + batch_sizes.push(batch_size); + } + // Partial batches should exist + let exists_partial_batch = batch_sizes.iter().any(|batch_size| *batch_size < 10); + // Full batches should exist + let exists_full_batch = batch_sizes.iter().any(|batch_size| *batch_size == 10); + // Batches larger than the max batch size should not exist + let exists_oversized_batch = batch_sizes.iter().any(|batch_size| *batch_size > 10); + assert![exists_partial_batch]; + assert![exists_full_batch]; + assert![!exists_oversized_batch]; +}