Skip to content

Commit

Permalink
Update the Parallel Gateway model to better align with the DEVS forma…
Browse files Browse the repository at this point in the history
…lism - use Moore machine style outputs and decouple conditionality from state changes
  • Loading branch information
ndebuhr committed Jun 20, 2021
1 parent cfc2630 commit f99ede3
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 131 deletions.
280 changes: 151 additions & 129 deletions sim/src/models/parallel_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};
use super::model_trait::{AsModel, SerializableModel};
use super::ModelMessage;
use crate::simulator::Services;
use crate::utils::default_records_port_name;
use crate::utils::error::SimulationError;
use crate::utils::{populate_history_port, populate_snapshot_port};

use sim_derive::SerializableModel;

Expand All @@ -21,109 +21,181 @@ pub struct ParallelGateway {
ports_in: PortsIn,
ports_out: PortsOut,
#[serde(default)]
state: State,
#[serde(default)]
snapshot: Metrics,
store_records: bool,
#[serde(default)]
history: Vec<Metrics>,
state: State,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PortsIn {
flow_paths: Vec<String>,
snapshot: Option<String>,
history: Option<String>,
#[serde(default = "default_records_port_name")]
records: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PortsOut {
flow_paths: Vec<String>,
snapshot: Option<String>,
history: Option<String>,
#[serde(default = "default_records_port_name")]
records: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct State {
event_list: Vec<ScheduledEvent>,
phase: Phase,
until_next_event: f64,
collections: HashMap<String, usize>,
records: Vec<Record>,
}

impl Default for State {
fn default() -> Self {
let initalization_event = ScheduledEvent {
time: 0.0,
event: Event::Run,
};
State {
event_list: vec![initalization_event],
Self {
phase: Phase::Processing,
until_next_event: 0.0,
collections: HashMap::new(),
records: Vec::new(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum Event {
Run,
SendJob,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct ScheduledEvent {
time: f64,
event: Event,
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
enum Phase {
Processing,
RecordsFetch,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Metrics {
last_arrival: Option<(String, f64)>,
last_departure: Option<(String, f64)>,
}

impl Default for Metrics {
fn default() -> Self {
Metrics {
last_arrival: None,
last_departure: None,
}
}
struct Record {
content: String,
time: f64,
}

impl ParallelGateway {
pub fn new(
flow_paths_in: Vec<String>,
flow_paths_out: Vec<String>,
snapshot_metrics: bool,
history_metrics: bool,
store_records: bool,
) -> Self {
Self {
ports_in: PortsIn {
flow_paths: flow_paths_in,
snapshot: populate_snapshot_port(snapshot_metrics),
history: populate_history_port(history_metrics),
records: default_records_port_name(),
},
ports_out: PortsOut {
flow_paths: flow_paths_out,
snapshot: populate_snapshot_port(snapshot_metrics),
history: populate_history_port(history_metrics),
records: default_records_port_name(),
},
store_records,
state: Default::default(),
snapshot: Default::default(),
history: Default::default(),
}
}

fn need_snapshot_metrics(&self) -> bool {
self.ports_in.snapshot.is_some() && self.ports_out.snapshot.is_some()
fn full_collection(&self) -> Option<(&String, &usize)> {
self.state
.collections
.iter()
.find(|(_, count)| **count == self.ports_in.flow_paths.len())
}

fn increment_collection(
&mut self,
incoming_message: &ModelMessage,
_services: &mut Services,
) -> Result<(), SimulationError> {
*self
.state
.collections
.entry(incoming_message.content.clone())
.or_insert(0) += 1;
self.state.until_next_event = 0.0;
Ok(())
}

fn need_historical_metrics(&self) -> bool {
self.need_snapshot_metrics()
&& self.ports_in.history.is_some()
&& self.ports_out.history.is_some()
fn request_records(
&mut self,
_incoming_message: &ModelMessage,
_services: &mut Services,
) -> Result<(), SimulationError> {
self.state.phase = Phase::RecordsFetch;
self.state.until_next_event = 0.0;
Ok(())
}

fn ignore_request(
&mut self,
_incoming_message: &ModelMessage,
_services: &mut Services,
) -> Result<(), SimulationError> {
Ok(())
}

fn release_records(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.phase = Phase::Processing;
self.state.until_next_event = 0.0;
Ok(vec![ModelMessage {
port_name: self.ports_out.records.clone(),
content: serde_json::to_string(&self.state.records).unwrap(),
}])
}

fn send_and_save_job(
&mut self,
services: &mut Services,
) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.until_next_event = 0.0;
let completed_collection = self
.full_collection()
.ok_or(SimulationError::InvalidModelState)?
.0
.to_string();
self.state.collections.remove(&completed_collection);
self.state.records.push(Record {
content: completed_collection.clone(),
time: services.global_time(),
});
Ok(self
.ports_out
.flow_paths
.iter()
.fold(Vec::new(), |mut messages, flow_path| {
messages.push(ModelMessage {
port_name: flow_path.clone(),
content: completed_collection.clone(),
});
messages
}))
}

fn send_job(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.until_next_event = 0.0;
let completed_collection = self
.full_collection()
.ok_or(SimulationError::InvalidModelState)?
.0
.to_string();
self.state.collections.remove(&completed_collection);
Ok(self
.ports_out
.flow_paths
.iter()
.fold(Vec::new(), |mut messages, flow_path| {
messages.push(ModelMessage {
port_name: flow_path.clone(),
content: completed_collection.clone(),
});
messages
}))
}

fn passivate(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
self.state.phase = Phase::Processing;
self.state.until_next_event = INFINITY;
Ok(Vec::new())
}
}

Expand All @@ -137,26 +209,14 @@ impl AsModel for ParallelGateway {
incoming_message: &ModelMessage,
services: &mut Services,
) -> Result<Vec<ModelMessage>, SimulationError> {
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_arrival =
Some((incoming_message.content.clone(), services.global_time()));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
}
// State changes
let matching_collection = self
.state
.collections
.entry(incoming_message.content.clone())
.or_insert(0);
*matching_collection += 1;
if *matching_collection == self.ports_in.flow_paths.len() {
self.state.event_list.push(ScheduledEvent {
time: 0.0,
event: Event::SendJob,
})
if incoming_message.port_name == self.ports_in.records && self.store_records {
self.request_records(incoming_message, services)?;
} else if incoming_message.port_name == self.ports_in.records && !self.store_records {
self.ignore_request(incoming_message, services)?;
} else if incoming_message.port_name != self.ports_in.records {
self.increment_collection(incoming_message, services)?;
} else {
return Err(SimulationError::InvalidModelState);
}
Ok(Vec::new())
}
Expand All @@ -165,68 +225,30 @@ impl AsModel for ParallelGateway {
&mut self,
services: &mut Services,
) -> Result<Vec<ModelMessage>, SimulationError> {
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
let events = self.state.event_list.clone();
self.state.event_list = self
.state
.event_list
.iter()
.filter(|scheduled_event| scheduled_event.time != 0.0)
.cloned()
.collect();
events
.iter()
.filter(|scheduled_event| scheduled_event.time == 0.0)
.map(|scheduled_event| {
match scheduled_event.event {
Event::Run => {}
Event::SendJob => {
let completed_collection = self
.state
.collections
.iter()
.find(|(_, count)| **count == self.ports_in.flow_paths.len())
.ok_or(SimulationError::InvalidModelState)?
.0
.to_string();
self.ports_out.flow_paths.iter().for_each(|port_out| {
outgoing_messages.push(ModelMessage {
port_name: String::from(port_out),
content: completed_collection.clone(),
});
});
self.state.collections.remove(&completed_collection);
// Possible metrics updates
if self.need_snapshot_metrics() {
self.snapshot.last_departure =
Some((completed_collection, services.global_time()));
}
if self.need_historical_metrics() {
self.history.push(self.snapshot.clone());
}
}
}
Ok(Vec::new())
})
.find(|result| result.is_err())
.unwrap_or(Ok(outgoing_messages))
if self.state.phase == Phase::RecordsFetch {
self.release_records()
} else if self.state.phase == Phase::Processing
&& self.store_records
&& self.full_collection().is_some()
{
self.send_and_save_job(services)
} else if self.state.phase == Phase::Processing
&& !self.store_records
&& self.full_collection().is_some()
{
self.send_job()
} else if self.state.phase == Phase::Processing && self.full_collection().is_none() {
self.passivate()
} else {
Err(SimulationError::InvalidModelState)
}
}

fn time_advance(&mut self, time_delta: f64) {
self.state
.event_list
.iter_mut()
.for_each(|scheduled_event| {
scheduled_event.time -= time_delta;
});
self.state.until_next_event -= time_delta;
}

fn until_next_event(&self) -> f64 {
self.state
.event_list
.iter()
.fold(INFINITY, |until_next_event, event| {
f64::min(until_next_event, event.time)
})
self.state.until_next_event
}
}
2 changes: 0 additions & 2 deletions sim/tests/simulations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,6 @@ fn parallel_gateway_splits_and_joins() {
String::from("delta"),
],
false,
false,
)),
),
Model::new(
Expand All @@ -705,7 +704,6 @@ fn parallel_gateway_splits_and_joins() {
],
vec![String::from("out")],
false,
false,
)),
),
Model::new(
Expand Down

0 comments on commit f99ede3

Please sign in to comment.