Skip to content

Commit

Permalink
Add a Batcher model
Browse files Browse the repository at this point in the history
  • Loading branch information
ndebuhr committed May 22, 2021
1 parent e53f5ac commit 331ff15
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 0 deletions.
176 changes: 176 additions & 0 deletions src/models/batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::f64::INFINITY;

use serde::{Deserialize, Serialize};

use super::model_trait::{AsModel, SerializableModel};
use super::ModelMessage;
use crate::simulator::Services;
use crate::utils::error::SimulationError;

use sim_derive::SerializableModel;

/// The batching process begins when the batcher receives a job. It will
/// then accept additional jobs, adding them to a batch with the first job,
/// until a max batching time or max batch size is reached - whichever comes
/// first. If the simultaneous arrival of multiple jobs causes the max batch
/// size to be exceeded, then the excess jobs will spillover into the next
/// batching period. In this case of excess jobs, the next batching period
/// begins immediately after the release of the preceding batch. If there
/// are no excess jobs, the batcher will become passive, and wait for a job
/// arrival to initiate the batching process.
#[derive(Debug, Clone, Deserialize, Serialize, SerializableModel)]
#[serde(rename_all = "camelCase")]
pub struct Batcher {
ports_in: PortsIn,
ports_out: PortsOut,
max_batch_time: f64,
max_batch_size: usize,
#[serde(default)]
state: State,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PortsIn {
job: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PortsOut {
job: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct State {
phase: Phase,
until_next_event: f64,
jobs: Vec<String>,
}

impl Default for State {
fn default() -> Self {
State {
phase: Phase::Passive,
until_next_event: INFINITY,
jobs: Vec::new(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum Phase {
Passive, // Doing nothing
Batching, // Building a batch
Release, // Releasing a batch
}

impl Batcher {
pub fn new(
job_in_port: String,
job_out_port: String,
max_batch_time: f64,
max_batch_size: usize,
) -> Self {
Self {
ports_in: PortsIn { job: job_in_port },
ports_out: PortsOut { job: job_out_port },
max_batch_time,
max_batch_size,
state: Default::default(),
}
}

fn add_to_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
self.state.phase = Phase::Batching;
self.state.jobs.push(incoming_message.content.clone());
Ok(())
}

fn start_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
self.state.phase = Phase::Batching;
self.state.until_next_event = self.max_batch_time;
self.state.jobs.push(incoming_message.content.clone());
Ok(())
}

fn fill_batch(&mut self, incoming_message: &ModelMessage) -> Result<(), SimulationError> {
self.state.phase = Phase::Release;
self.state.until_next_event = 0.0;
self.state.jobs.push(incoming_message.content.clone());
Ok(())
}

fn release_full_queue(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.phase = Phase::Passive;
self.state.until_next_event = INFINITY;
Ok((0..self.state.jobs.len())
.map(|_| ModelMessage {
port_name: self.ports_out.job.clone(),
content: self.state.jobs.remove(0),
})
.collect())
}

fn release_partial_queue(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.phase = Phase::Batching;
self.state.until_next_event = self.max_batch_time;
Ok((0..self.max_batch_size)
.map(|_| ModelMessage {
port_name: self.ports_out.job.clone(),
content: self.state.jobs.remove(0),
})
.collect())
}
}

impl AsModel for Batcher {
fn status(&self) -> String {
match self.state.phase {
Phase::Passive => String::from("Passive"),
Phase::Batching => String::from("Creating batch"),
Phase::Release => String::from("Releasing batch"),
}
}

fn events_ext(
&mut self,
incoming_message: &ModelMessage,
_services: &mut Services,
) -> Result<Vec<ModelMessage>, SimulationError> {
if self.state.phase == Phase::Batching && self.state.jobs.len() + 1 < self.max_batch_size {
self.add_to_batch(incoming_message)?;
} else if self.state.phase == Phase::Passive
&& self.state.jobs.len() + 1 < self.max_batch_size
{
self.start_batch(incoming_message)?;
} else if self.state.jobs.len() + 1 >= self.max_batch_size {
self.fill_batch(incoming_message)?;
} else {
return Err(SimulationError::InvalidModelState);
}
Ok(Vec::new())
}

fn events_int(
&mut self,
_services: &mut Services,
) -> Result<Vec<ModelMessage>, SimulationError> {
if self.state.jobs.len() <= self.max_batch_size {
self.release_full_queue()
} else if self.state.jobs.len() <= self.max_batch_size {
self.release_partial_queue()
} else {
Err(SimulationError::InvalidModelState)
}
}

fn time_advance(&mut self, time_delta: f64) {
self.state.until_next_event -= time_delta;
}

fn until_next_event(&self) -> f64 {
self.state.until_next_event
}
}
2 changes: 2 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! specifies the requirements of any additional custom models, via the
//! `Model` trait.

pub mod batcher;
pub mod exclusive_gateway;
pub mod gate;
pub mod generator;
Expand All @@ -17,6 +18,7 @@ pub mod model_factory;
pub mod model_repr;
pub mod model_trait;

pub use self::batcher::Batcher;
pub use self::exclusive_gateway::ExclusiveGateway;
pub use self::gate::Gate;
pub use self::generator::Generator;
Expand Down
70 changes: 70 additions & 0 deletions tests/simulations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,3 +898,73 @@ fn stochastic_gate_blocking() {
assert![sample.confidence_interval_mean(0.01).unwrap().lower() < 0.2];
assert![sample.confidence_interval_mean(0.01).unwrap().upper() > 0.2];
}

#[test]
fn batch_sizing() {
let models = [
Model::new(
String::from("generator-01"),
Box::new(Generator::new(
ContinuousRandomVariable::Exp { lambda: 1.0 },
None,
String::from("job"),
false,
false,
)),
),
Model::new(
String::from("batcher-01"),
Box::new(Batcher::new(
String::from("job"),
String::from("job"),
10.0, // 10 seconds max batching time
10, // 10 jobs max batch size
)),
),
Model::new(
String::from("storage-01"),
Box::new(Storage::new(
String::from("store"),
String::from("read"),
String::from("stored"),
false,
false,
)),
),
];
let connectors = [
Connector::new(
String::from("connector-01"),
String::from("generator-01"),
String::from("batcher-01"),
String::from("job"),
String::from("job"),
),
Connector::new(
String::from("connector-02"),
String::from("batcher-01"),
String::from("storage-01"),
String::from("job"),
String::from("store"),
),
];
let mut simulation = Simulation::post(models.to_vec(), connectors.to_vec());
let mut batch_sizes: Vec<usize> = Vec::new();
for _ in 0..10000 {
let message_records: Vec<Message> = simulation.step().unwrap();
let batch_size = message_records
.iter()
.filter(|message_record| message_record.target_id() == "storage-01")
.count();
batch_sizes.push(batch_size);
}
// Partial batches should exist
let exists_partial_batch = batch_sizes.iter().any(|batch_size| *batch_size < 10);
// Full batches should exist
let exists_full_batch = batch_sizes.iter().any(|batch_size| *batch_size == 10);
// Batches larger than the max batch size should not exist
let exists_oversized_batch = batch_sizes.iter().any(|batch_size| *batch_size > 10);
assert![exists_partial_batch];
assert![exists_full_batch];
assert![!exists_oversized_batch];
}

0 comments on commit 331ff15

Please sign in to comment.