diff --git a/.vscode/settings.json b/.vscode/settings.json index 80aead1799a..d650a5fd0e7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -13,6 +13,15 @@ } } ], + "editor.semanticTokenColorCustomizations": { + "enabled": true, + "rules": { + "*.unsafe:rust": { + "foreground": "#ea1708", + "fontStyle": "bold" + } + } + }, "files.watcherExclude": { "**/target": true }, diff --git a/Cargo.toml b/Cargo.toml index ab8fef76bc8..224bad27797 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ opt-level = "s" [workspace.lints.rust] unused_qualifications = "warn" +unsafe_op_in_unsafe_fn = "warn" [workspace.lints.clippy] allow_attributes = "warn" diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 2c1de84faff..663abe4cc90 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -1,6 +1,7 @@ #![expect( unused_qualifications, non_local_definitions, + unsafe_op_in_unsafe_fn, reason = "for pyo3 generated code" )] diff --git a/hydroflow_plus/src/cycle.rs b/hydroflow_plus/src/cycle.rs index 4d61cc1d30b..1ac45600f7b 100644 --- a/hydroflow_plus/src/cycle.rs +++ b/hydroflow_plus/src/cycle.rs @@ -1,4 +1,4 @@ -use crate::location::Location; +use crate::location::{Location, LocationId}; use crate::staging_util::Invariant; pub enum ForwardRefMarker {} @@ -9,7 +9,7 @@ pub trait DeferTick { } pub trait CycleComplete<'a, T> { - fn complete(self, ident: syn::Ident); + fn complete(self, ident: syn::Ident, expected_location: LocationId); } pub trait CycleCollection<'a, T>: CycleComplete<'a, T> { @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> { /// See [`crate::FlowBuilder`] for an explainer on the type parameters. pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> { pub(crate) ident: syn::Ident, + pub(crate) expected_location: LocationId, pub(crate) _phantom: Invariant<'a, S>, } impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> { pub fn complete(self, stream: S) { let ident = self.ident; - S::complete(stream, ident) + S::complete(stream, ident, self.expected_location) } } pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> { pub(crate) ident: syn::Ident, + pub(crate) expected_location: LocationId, pub(crate) _phantom: Invariant<'a, S>, } impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> { pub fn complete_next_tick(self, stream: S) { let ident = self.ident; - S::complete(stream.defer_tick(), ident) + S::complete(stream.defer_tick(), ident, self.expected_location) } } diff --git a/hydroflow_plus/src/location/cluster/mod.rs b/hydroflow_plus/src/location/cluster/mod.rs index cfd83e195d8..6d2b71cc362 100644 --- a/hydroflow_plus/src/location/cluster/mod.rs +++ b/hydroflow_plus/src/location/cluster/mod.rs @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> { pub trait IsCluster { type Tag; - fn id(&self) -> usize; } impl IsCluster for Cluster<'_, C> { type Tag = C; - fn id(&self) -> usize { - self.id - } } impl<'a, C> Cluster<'a, C> { @@ -125,8 +121,14 @@ where where Self: Sized, { + let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() { + id + } else { + unreachable!() + }; + let ident = syn::Ident::new( - &format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()), + &format!("__hydroflow_plus_cluster_self_id_{}", cluster_id), Span::call_site(), ); let root = get_this_crate(); diff --git a/hydroflow_plus/src/location/mod.rs b/hydroflow_plus/src/location/mod.rs index 61693cb7c66..70e39d36a35 100644 --- a/hydroflow_plus/src/location/mod.rs +++ b/hydroflow_plus/src/location/mod.rs @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) { } pub trait Location<'a>: Clone { - type Root; + type Root: Location<'a>; fn root(&self) -> Self::Root; @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone { ) } - fn source_interval( + /// Generates a stream with values emitted at a fixed interval, with + /// each value being the current time (as an [`tokio::time::Instant`]). + /// + /// The clock source used is monotonic, so elements will be emitted in + /// increasing order. + /// + /// # Safety + /// Because this stream is generated by an OS timer, it will be + /// non-deterministic because each timestamp will be arbitrary. + unsafe fn source_interval( &self, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, ) -> Stream @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone { ))) } - fn source_interval_delayed( + /// Generates a stream with values emitted at a fixed interval (with an + /// initial delay), with each value being the current time + /// (as an [`tokio::time::Instant`]). + /// + /// The clock source used is monotonic, so elements will be emitted in + /// increasing order. + /// + /// # Safety + /// Because this stream is generated by an OS timer, it will be + /// non-deterministic because each timestamp will be arbitrary. + unsafe fn source_interval_delayed( &self, delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a, @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone { ( ForwardRef { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), diff --git a/hydroflow_plus/src/location/tick.rs b/hydroflow_plus/src/location/tick.rs index 809a750c9bc..7d23bec7fe0 100644 --- a/hydroflow_plus/src/location/tick.rs +++ b/hydroflow_plus/src/location/tick.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use proc_macro2::Span; +use sealed::sealed; use stageleft::{q, QuotedWithContext}; use super::{Cluster, Location, LocationId, Process}; @@ -12,10 +13,50 @@ use crate::cycle::{ use crate::ir::{HfPlusNode, HfPlusSource}; use crate::{Bounded, Optional, Singleton, Stream}; +#[sealed] pub trait NoTick {} +#[sealed] impl NoTick for Process<'_, T> {} +#[sealed] impl NoTick for Cluster<'_, T> {} +#[sealed] +pub trait NoTimestamp {} +#[sealed] +impl NoTimestamp for Process<'_, T> {} +#[sealed] +impl NoTimestamp for Cluster<'_, T> {} +#[sealed] +impl<'a, L: Location<'a>> NoTimestamp for Tick {} + +#[derive(Clone)] +pub struct Timestamped { + pub(crate) tick: Tick, +} + +impl<'a, L: Location<'a>> Location<'a> for Timestamped { + type Root = L::Root; + + fn root(&self) -> Self::Root { + self.tick.root() + } + + fn id(&self) -> LocationId { + self.tick.id() + } + + fn flow_state(&self) -> &FlowState { + self.tick.flow_state() + } + + fn is_top_level() -> bool { + L::is_top_level() + } +} + +#[sealed] +impl NoTick for Timestamped {} + /// Marks the stream as being inside the single global clock domain. #[derive(Clone)] pub struct Tick { @@ -53,13 +94,20 @@ impl<'a, L: Location<'a>> Tick { batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a, ) -> Stream<(), Self, Bounded> where - L: NoTick, + L: NoTick + NoTimestamp, { - self.l + let out = self + .l .spin() - .flat_map(q!(move |_| 0..batch_size)) + .flat_map_ordered(q!(move |_| 0..batch_size)) .map(q!(|_| ())) - .tick_batch(self) + .timestamped(self); + + unsafe { + // SAFETY: at runtime, `spin` produces a single value per tick, + // so each batch is guaranteed to be the same size. + out.tick_batch() + } } pub fn singleton( @@ -69,7 +117,10 @@ impl<'a, L: Location<'a>> Tick { where L: NoTick, { - self.outer().singleton(e).latest_tick(self) + unsafe { + // SAFETY: a top-level singleton produces the same value each tick + self.outer().singleton(e).timestamped(self).latest_tick() + } } pub fn singleton_first_tick( @@ -118,12 +169,46 @@ impl<'a, L: Location<'a>> Tick { ( ForwardRef { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), ) } + pub fn forward_ref_timestamped< + S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped>, + >( + &self, + ) -> (ForwardRef<'a, S>, S) { + let next_id = { + let on_id = match self.l.id() { + LocationId::Process(id) => id, + LocationId::Cluster(id) => id, + LocationId::Tick(_, _) => panic!(), + LocationId::ExternalProcess(_) => panic!(), + }; + + let mut flow_state = self.flow_state().borrow_mut(); + let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default(); + + let id = *next_id_entry; + *next_id_entry += 1; + id + }; + + let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); + + ( + ForwardRef { + ident: ident.clone(), + expected_location: self.id(), + _phantom: PhantomData, + }, + S::create_source(ident, Timestamped { tick: self.clone() }), + ) + } + pub fn cycle + DeferTick>( &self, ) -> (TickCycle<'a, S>, S) @@ -151,6 +236,7 @@ impl<'a, L: Location<'a>> Tick { ( TickCycle { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, self.clone()), @@ -187,6 +273,7 @@ impl<'a, L: Location<'a>> Tick { ( TickCycle { ident: ident.clone(), + expected_location: self.id(), _phantom: PhantomData, }, S::create_source(ident, initial, self.clone()), diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs index bfde8c2ef5e..6d422d14d5b 100644 --- a/hydroflow_plus/src/optional.rs +++ b/hydroflow_plus/src/optional.rs @@ -9,7 +9,10 @@ use syn::parse_quote; use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{check_matching_location, LocationId, NoTick}; +use crate::singleton::ZipResult; +use crate::stream::NoOrder; use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded}; pub struct Optional { @@ -61,7 +64,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycleMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Optional, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -94,7 +102,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Optional, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -127,7 +140,12 @@ impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> } impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> for Optional { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -179,15 +197,6 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional { } impl<'a, T, L: Location<'a>, B> Optional { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - if L::is_top_level() { - panic!("Converting an optional to a stream is not yet supported at the top level"); - } - - Stream::new(self.location, self.ir_node.into_inner()) - } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional { let f = f.splice_fn1_ctx(&self.location).into(); Optional::new( @@ -199,7 +208,7 @@ impl<'a, T, L: Location<'a>, B> Optional { ) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -213,11 +222,32 @@ impl<'a, T, L: Location<'a>, B> Optional { ) } - pub fn flatten(self) -> Stream + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flatten_ordered(self) -> Stream where T: IntoIterator, { - self.flat_map(q!(|v| v)) + self.flat_map_ordered(q!(|v| v)) + } + + pub fn flatten_unordered(self) -> Stream + where + T: IntoIterator, + { + self.flat_map_unordered(q!(|v| v)) } pub fn filter bool + 'a>( @@ -350,48 +380,123 @@ impl<'a, T, L: Location<'a>> Optional { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } - pub fn then(self, value: Singleton) -> Optional { + pub fn then(self, value: Singleton) -> Optional + where + Singleton: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(U, ()), L, Bounded>, + >, + { value.continue_if(self) } + + pub fn into_stream(self) -> Stream { + if L::is_top_level() { + panic!("Converting an optional to a stream is not yet supported at the top level"); + } + + Stream::new(self.location, self.ir_node.into_inner()) + } } -impl<'a, T, L: Location<'a> + NoTick, B> Optional { - pub fn latest_tick(self, tick: &Tick) -> Optional, Bounded> { +impl<'a, T, L: Location<'a> + NoTick, B> Optional, B> { + /// Given a tick, returns a optional value corresponding to a snapshot of the optional + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a optional whose value is continuously changing, + /// the output optional has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self) -> Optional, Bounded> { Optional::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn drop_timestamp(self) -> Optional { + Optional::new(self.location.tick.l, self.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> Optional { + pub fn timestamped(self, tick: &Tick) -> Optional, B> { + Optional::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Eagerly samples the optional as fast as possible, returning a stream of snapshots + /// with order corresponding to increasing prefixes of data contributing to the optional. + /// + /// # Safety + /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due + /// to non-deterministic batching and arrival of inputs, the output stream is + /// non-deterministic. + pub unsafe fn sample_eager(self) -> Stream { let tick = self.location.tick(); - self.latest_tick(&tick).all_ticks() + + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .all_ticks() + .drop_timestamp() + } } - pub fn sample_every( + /// Given a time interval, returns a stream corresponding to snapshots of the optional + /// value taken at various points in time. Because the input optional may be + /// [`Unbounded`], there are no guarantees on what these snapshots are other than they + /// represent the value of the optional given some prefix of the streams leading up to + /// it. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, - ) -> Stream { - let samples = self.location.source_interval(interval); + ) -> Stream + where + L: NoTimestamp, + { + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; let tick = self.location.tick(); - self.latest_tick(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } } impl<'a, T, L: Location<'a>> Optional, Bounded> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Optional { + pub fn latest(self) -> Optional, Unbounded> { Optional::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } diff --git a/hydroflow_plus/src/rewrites/persist_pullup.rs b/hydroflow_plus/src/rewrites/persist_pullup.rs index 21c2a130eb9..7176dfde899 100644 --- a/hydroflow_plus/src/rewrites/persist_pullup.rs +++ b/hydroflow_plus/src/rewrites/persist_pullup.rs @@ -168,7 +168,13 @@ mod tests { let process = flow.process::<()>(); let tick = process.tick(); - let before_tee = process.source_iter(q!(0..10)).tick_batch(&tick).persist(); + let before_tee = unsafe { + process + .source_iter(q!(0..10)) + .timestamped(&tick) + .tick_batch() + .persist() + }; before_tee .clone() diff --git a/hydroflow_plus/src/rewrites/properties.rs b/hydroflow_plus/src/rewrites/properties.rs index f09b8a11926..2f0323ab779 100644 --- a/hydroflow_plus/src/rewrites/properties.rs +++ b/hydroflow_plus/src/rewrites/properties.rs @@ -121,13 +121,16 @@ mod tests { let counter_func = q!(|count: &mut i32, _| *count += 1); let _ = database.add_commutative_tag(counter_func, &tick); - process - .source_iter(q!(vec![])) - .map(q!(|string: String| (string, ()))) - .tick_batch(&tick) - .fold_keyed(q!(|| 0), counter_func) - .all_ticks() - .for_each(q!(|(string, count)| println!("{}: {}", string, count))); + unsafe { + process + .source_iter(q!(vec![])) + .map(q!(|string: String| (string, ()))) + .timestamped(&tick) + .tick_batch() + } + .fold_keyed(q!(|| 0), counter_func) + .all_ticks() + .for_each(q!(|(string, count)| println!("{}: {}", string, count))); let built = flow .optimize_with(|ir| properties_optimize(ir, &database)) diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index f89957e51dc..819b3c06dfc 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -11,6 +11,7 @@ use crate::cycle::{ TickCycleMarker, }; use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick}; use crate::{Bounded, Optional, Stream, Unbounded}; @@ -68,7 +69,12 @@ impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycleMarker> } impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycleMarker> for Singleton, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -103,7 +109,12 @@ impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRefMarker> impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> for Singleton, Bounded> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -118,6 +129,46 @@ impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRefMarker> } } +impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRefMarker> + for Singleton +{ + type Location = L; + + fn create_source(ident: syn::Ident, location: L) -> Self { + let location_id = location.id(); + Singleton::new( + location, + HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { + ident, + location_kind: location_id, + })), + ) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRefMarker> + for Singleton +{ + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); + self.location + .flow_state() + .borrow_mut() + .leaves + .as_mut() + .expect(FLOW_USED_MESSAGE) + .push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind(), + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + }); + } +} + impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { @@ -143,11 +194,6 @@ impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton { } impl<'a, T, L: Location<'a>, B> Singleton { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - Stream::new(self.location, self.ir_node.into_inner()) - } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F, L>) -> Singleton { let f = f.splice_fn1_ctx(&self.location).into(); Singleton::new( @@ -159,7 +205,21 @@ impl<'a, T, L: Location<'a>, B> Singleton { ) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -225,55 +285,128 @@ impl<'a, T, L: Location<'a>, B> Singleton { ) } } -} -impl<'a, T, L: Location<'a>> Singleton { - pub fn continue_if(self, signal: Optional) -> Optional { + pub fn continue_if(self, signal: Optional) -> Optional + where + Self: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(T, ()), L, Bounded>, + >, + { self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) } - pub fn continue_unless(self, other: Optional) -> Optional { + pub fn continue_unless(self, other: Optional) -> Optional + where + Singleton: ZipResult< + 'a, + Optional<(), L, Bounded>, + Location = L, + Out = Optional<(T, ()), L, Bounded>, + >, + { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } } -impl<'a, T, L: Location<'a> + NoTick, B> Singleton { - pub fn latest_tick(self, tick: &Tick) -> Singleton, Bounded> { +impl<'a, T, L: Location<'a> + NoTick, B> Singleton, B> { + /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton + /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all + /// relevant data that contributed to the snapshot at tick `t`. + /// + /// # Safety + /// Because this picks a snapshot of a singleton whose value is continuously changing, + /// the output singleton has a non-deterministic value since the snapshot can be at an + /// arbitrary point in time. + pub unsafe fn latest_tick(self) -> Singleton, Bounded> { Singleton::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn drop_timestamp(self) -> Optional { + Optional::new(self.location.tick.l, self.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a> + NoTick, B> Singleton { + pub fn timestamped(self, tick: &Tick) -> Singleton, B> { + Singleton::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots + /// with order corresponding to increasing prefixes of data contributing to the singleton. + /// + /// # Safety + /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due + /// to non-deterministic batching and arrival of inputs, the output stream is + /// non-deterministic. + pub unsafe fn sample_eager(self) -> Stream { let tick = self.location.tick(); - self.latest_tick(&tick).all_ticks() + + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .all_ticks() + .drop_timestamp() + } } - pub fn sample_every( + /// Given a time interval, returns a stream corresponding to snapshots of the singleton + /// value taken at various points in time. Because the input singleton may be + /// [`Unbounded`], there are no guarantees on what these snapshots are other than they + /// represent the value of the singleton given some prefix of the streams leading up to + /// it. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, - ) -> Stream { - let samples = self.location.source_interval(interval); + ) -> Stream + where + L: NoTimestamp, + { + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; let tick = self.location.tick(); - self.latest_tick(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .latest_tick() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } } impl<'a, T, L: Location<'a>> Singleton, Bounded> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Singleton { + pub fn latest(self) -> Singleton, Unbounded> { Singleton::new( - self.location.outer().clone(), + Timestamped { + tick: self.location, + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } @@ -298,6 +431,10 @@ impl<'a, T, L: Location<'a>> Singleton, Bounded> { HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), ) } + + pub fn into_stream(self) -> Stream, Bounded> { + Stream::new(self.location, self.ir_node.into_inner()) + } } pub trait ZipResult<'a, Other> { @@ -310,36 +447,78 @@ pub trait ZipResult<'a, Other> { fn make(location: Self::Location, ir_node: HfPlusNode) -> Self::Out; } -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton> for Singleton { - type Out = Singleton<(T, U), L, B>; - type Location = L; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> + for Singleton, B> +{ + type Out = Singleton<(T, U), Timestamped, B>; + type Location = Timestamped; - fn other_location(other: &Singleton) -> L { + fn other_location(other: &Singleton, B>) -> Timestamped { other.location.clone() } - fn other_ir_node(other: Singleton) -> HfPlusNode { + fn other_ir_node(other: Singleton, B>) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: L, ir_node: HfPlusNode) -> Self::Out { + fn make(location: Timestamped, ir_node: HfPlusNode) -> Self::Out { Singleton::new(location, ir_node) } } -impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional> for Singleton { - type Out = Optional<(T, U), L, B>; - type Location = L; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional, B>> + for Singleton, B> +{ + type Out = Optional<(T, U), Timestamped, B>; + type Location = Timestamped; + + fn other_location(other: &Optional, B>) -> Timestamped { + other.location.clone() + } + + fn other_ir_node(other: Optional, B>) -> HfPlusNode { + other.ir_node.into_inner() + } + + fn make(location: Timestamped, ir_node: HfPlusNode) -> Self::Out { + Optional::new(location, ir_node) + } +} + +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton, B>> + for Singleton, B> +{ + type Out = Singleton<(T, U), Tick, B>; + type Location = Tick; + + fn other_location(other: &Singleton, B>) -> Tick { + other.location.clone() + } + + fn other_ir_node(other: Singleton, B>) -> HfPlusNode { + other.ir_node.into_inner() + } + + fn make(location: Tick, ir_node: HfPlusNode) -> Self::Out { + Singleton::new(location, ir_node) + } +} + +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional, B>> + for Singleton, B> +{ + type Out = Optional<(T, U), Tick, B>; + type Location = Tick; - fn other_location(other: &Optional) -> L { + fn other_location(other: &Optional, B>) -> Tick { other.location.clone() } - fn other_ir_node(other: Optional) -> HfPlusNode { + fn other_ir_node(other: Optional, B>) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: L, ir_node: HfPlusNode) -> Self::Out { + fn make(location: Tick, ir_node: HfPlusNode) -> Self::Out { Optional::new(location, ir_node) } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 2da21bf8e47..06db898941a 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -11,12 +11,14 @@ use serde::de::DeserializeOwned; use serde::Serialize; use stageleft::{q, IntoQuotedMut, QuotedWithContext}; use syn::parse_quote; +use tokio::time::Instant; use crate::builder::FLOW_USED_MESSAGE; use crate::cycle::{CycleCollection, CycleComplete, DeferTick, ForwardRefMarker, TickCycleMarker}; use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, TeeNode}; use crate::location::cluster::CLUSTER_SELF_ID; use crate::location::external_process::{ExternalBincodeStream, ExternalBytesPort}; +use crate::location::tick::{NoTimestamp, Timestamped}; use crate::location::{ check_matching_location, CanSend, ExternalProcess, Location, LocationId, NoTick, Tick, }; @@ -115,7 +117,12 @@ impl<'a, T, L: Location<'a>, Order> CycleCollection<'a, TickCycleMarker> impl<'a, T, L: Location<'a>, Order> CycleComplete<'a, TickCycleMarker> for Stream, Bounded, Order> { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -150,7 +157,12 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleCollection<'a, ForwardRefMa impl<'a, T, L: Location<'a> + NoTick, B, Order> CycleComplete<'a, ForwardRefMarker> for Stream { - fn complete(self, ident: syn::Ident) { + fn complete(self, ident: syn::Ident, expected_location: LocationId) { + assert_eq!( + self.location.id(), + expected_location, + "locations do not match" + ); self.location .flow_state() .borrow_mut() @@ -221,7 +233,7 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { self.map(q!(|d| d.clone())) } - pub fn flat_map, F: Fn(T) -> I + 'a>( + pub fn flat_map_ordered, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F, L>, ) -> Stream { @@ -235,11 +247,32 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { ) } - pub fn flatten(self) -> Stream + pub fn flat_map_unordered, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F, L>, + ) -> Stream { + let f = f.splice_fn1_ctx(&self.location).into(); + Stream::new( + self.location, + HfPlusNode::FlatMap { + f, + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flatten_ordered(self) -> Stream + where + T: IntoIterator, + { + self.flat_map_ordered(q!(|d| d)) + } + + pub fn flatten_unordered(self) -> Stream where T: IntoIterator, { - self.flat_map(q!(|d| d)) + self.flat_map_unordered(q!(|d| d)) } pub fn filter bool + 'a>( @@ -366,7 +399,15 @@ impl<'a, T, L: Location<'a>, B, Order> Stream { } } - pub fn assume_ordering(self) -> Stream { + /// Explicitly "casts" the stream to a type with a different ordering + /// guarantee. Useful in unsafe code where the ordering cannot be proven + /// by the type-system. + /// + /// # Safety + /// This function is used as an escape hatch, and any mistakes in the + /// provided ordering guarantee will propogate into the guarantees + /// for the rest of the program. + pub unsafe fn assume_ordering(self) -> Stream { Stream::new(self.location, self.ir_node.into_inner()) } } @@ -558,15 +599,21 @@ impl<'a, T, L: Location<'a>> Stream { } } -impl<'a, T, L: Location<'a> + NoTick> Stream { +impl<'a, T, L: Location<'a> + NoTick + NoTimestamp> Stream { pub fn union( self, other: Stream, ) -> Stream { let tick = self.location.tick(); - self.tick_batch(&tick) - .union(other.tick_batch(&tick)) - .all_ticks() + unsafe { + // SAFETY: Because the inputs and outputs are unordered, + // we can interleave batches from both streams. + self.timestamped(&tick) + .tick_batch() + .union(other.timestamped(&tick).tick_batch()) + .all_ticks() + .drop_timestamp() + } } } @@ -704,32 +751,112 @@ impl<'a, K: Eq + Hash, V, L: Location<'a>, Order> Stream<(K, V), Tick, Bounde } } -impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { - pub fn tick_batch(self, tick: &Tick) -> Stream, Bounded, Order> { +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream, B, Order> { + /// Given a tick, returns a stream corresponding to a batch of elements for that tick. + /// These batches are guaranteed to be contiguous across ticks and preserve the order + /// of the input. + /// + /// # Safety + /// The batch boundaries are non-deterministic and may change across executions. + pub unsafe fn tick_batch(self) -> Stream, Bounded, Order> { Stream::new( - tick.clone(), + self.location.tick, HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_prefix(self, tick: &Tick) -> Stream, Bounded, Order> - where - T: Clone, - { - self.tick_batch(tick).persist() + pub fn drop_timestamp(self) -> Stream { + Stream::new(self.location.tick.l, self.ir_node.into_inner()) } - pub fn sample_every( + pub fn timestamp_source(&self) -> Tick { + self.location.tick.clone() + } +} + +impl<'a, T, L: Location<'a> + NoTick + NoTimestamp, B, Order> Stream { + pub fn timestamped(self, tick: &Tick) -> Stream, B, Order> { + Stream::new( + Timestamped { tick: tick.clone() }, + self.ir_node.into_inner(), + ) + } + + /// Given a time interval, returns a stream corresponding to samples taken from the + /// stream roughly at that interval. The output will have elements in the same order + /// as the input, but with arbitrary elements skipped between samples. There is also + /// no guarantee on the exact timing of the samples. + /// + /// # Safety + /// The output stream is non-deterministic in which elements are sampled, since this + /// is controlled by a clock. + pub unsafe fn sample_every( self, interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a, ) -> Stream { - let samples = self.location.source_interval(interval); + let samples = unsafe { + // SAFETY: source of intentional non-determinism + self.location.source_interval(interval) + }; + let tick = self.location.tick(); - self.tick_batch(&tick) - .continue_if(samples.tick_batch(&tick).first()) - .all_ticks() + unsafe { + // SAFETY: source of intentional non-determinism + self.timestamped(&tick) + .tick_batch() + .continue_if(samples.timestamped(&tick).tick_batch().first()) + .all_ticks() + .drop_timestamp() + } } + /// Given a timeout duration, returns an [`Optional`] which will have a value if the + /// stream has not emitted a value since that duration. + /// + /// # Safety + /// Timeout relies on non-deterministic sampling of the stream, so depending on when + /// samples take place, timeouts may be non-deterministically generated or missed, + /// and the notification of the timeout may be delayed as well. There is also no + /// guarantee on how long the [`Optional`] will have a value after the timeout is + /// detected based on when the next sample is taken. + pub unsafe fn timeout( + self, + duration: impl QuotedWithContext<'a, std::time::Duration, Tick> + Copy + 'a, + ) -> Optional<(), L, Unbounded> + where + Order: MinOrder, + { + let tick = self.location.tick(); + + let latest_received = self.fold_commutative( + q!(|| None), + q!(|latest, _| { + // Note: May want to check received ballot against our own? + *latest = Some(Instant::now()); + }), + ); + + unsafe { + // SAFETY: Non-deterministic delay in detecting a timeout is expected. + latest_received.timestamped(&tick).latest_tick() + } + .filter_map(q!(move |latest_received| { + if let Some(latest_received) = latest_received { + if Instant::now().duration_since(latest_received) > duration { + Some(()) + } else { + None + } + } else { + Some(()) + } + })) + .latest() + .drop_timestamp() + } +} + +impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn for_each(self, f: impl IntoQuotedMut<'a, F, L>) { let f = f.splice_fn1_ctx(&self.location).into(); self.location @@ -762,9 +889,11 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { } impl<'a, T, L: Location<'a>, Order> Stream, Bounded, Order> { - pub fn all_ticks(self) -> Stream { + pub fn all_ticks(self) -> Stream, Unbounded, Order> { Stream::new( - self.location.outer().clone(), + Timestamped { + tick: self.location.clone(), + }, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } @@ -848,12 +977,17 @@ impl<'a, T, C1, B, Order> Stream, B, Order> { Order: MinOrder< as CanSend<'a, Cluster<'a, C2>>>::OutStrongestOrder>, { - self.map(q!(move |b| ( - ClusterId::from_raw(CLUSTER_SELF_ID.raw_id), - b.clone() - ))) - .send_bincode_interleaved(other) - .assume_ordering() // this is safe because we are mapping clusters 1:1 + let sent = self + .map(q!(move |b| ( + ClusterId::from_raw(CLUSTER_SELF_ID.raw_id), + b.clone() + ))) + .send_bincode_interleaved(other); + + unsafe { + // SAFETY: this is safe because we are mapping clusters 1:1 + sent.assume_ordering() + } } } @@ -863,9 +997,12 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Process<'a, P2>, ) -> Stream, Unbounded, Order> where - L: CanSend<'a, Process<'a, P2>, In = T, Out = T>, + L::Root: CanSend<'a, Process<'a, P2>, In = T, Out = T>, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder, Min = Order>, + Order: MinOrder< + >>::OutStrongestOrder, + Min = Order, + >, { self.send_bincode::, T>(other) } @@ -873,20 +1010,20 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bincode, CoreType>( self, other: &L2, - ) -> Stream, L2, Unbounded, Order::Min> + ) -> Stream<>::Out, L2, Unbounded, Order::Min> where - L: CanSend<'a, L2, In = T>, + L::Root: CanSend<'a, L2, In = T>, CoreType: Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>::OutStrongestOrder>, { - let serialize_pipeline = Some(serialize_bincode::(L::is_demux())); + let serialize_pipeline = Some(serialize_bincode::(L::Root::is_demux())); - let deserialize_pipeline = Some(deserialize_bincode::(L::tagged_type())); + let deserialize_pipeline = Some(deserialize_bincode::(L::Root::tagged_type())); Stream::new( other.clone(), HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: None, @@ -921,7 +1058,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { leaves.push(HfPlusLeaf::ForEach { f: dummy_f.into(), input: Box::new(HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: Some(external_key), @@ -942,22 +1079,22 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bytes>( self, other: &L2, - ) -> Stream, L2, Unbounded, Order::Min> + ) -> Stream<>::Out, L2, Unbounded, Order::Min> where - L: CanSend<'a, L2, In = T>, - Order: MinOrder>, + L::Root: CanSend<'a, L2, In = T>, + Order: MinOrder<>::OutStrongestOrder>, { let root = get_this_crate(); Stream::new( other.clone(), HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: None, serialize_pipeline: None, instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: if let Some(c_type) = L::tagged_type() { + deserialize_pipeline: if let Some(c_type) = L::Root::tagged_type() { Some( parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))), ) @@ -971,7 +1108,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { pub fn send_bytes_external(self, other: &ExternalProcess) -> ExternalBytesPort where - L: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = Bytes>, + L::Root: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = Bytes>, { let mut flow_state_borrow = self.location.flow_state().borrow_mut(); let external_key = flow_state_borrow.next_external_out; @@ -984,7 +1121,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { leaves.push(HfPlusLeaf::ForEach { f: dummy_f.into(), input: Box::new(HfPlusNode::Network { - from_location: self.location_kind(), + from_location: self.location.root().id(), from_key: None, to_location: other.id(), to_key: Some(external_key), @@ -1006,9 +1143,9 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &L2, ) -> Stream where - L: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, + L::Root: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, CoreType: Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>::OutStrongestOrder>, { self.send_bincode::(other).map(q!(|(_, b)| b)) } @@ -1018,24 +1155,30 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &L2, ) -> Stream where - L: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, - Order: MinOrder>, + L::Root: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, + Order: MinOrder<>::OutStrongestOrder>, { self.send_bytes::(other).map(q!(|(_, b)| b)) } + #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")] pub fn broadcast_bincode( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> + ) -> Stream< + >>::Out, + Cluster<'a, C2>, + Unbounded, + Order::Min, + > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); - self.flat_map(q!(|b| ids.iter().map(move |id| ( + self.flat_map_ordered(q!(|b| ids.iter().map(move |id| ( ::std::clone::Clone::clone(id), ::std::clone::Clone::clone(&b) )))) @@ -1047,25 +1190,31 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Cluster<'a, C2>, ) -> Stream, Unbounded, Order::Min> where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { self.broadcast_bincode(other).map(q!(|(_, b)| b)) } + #[expect(clippy::type_complexity, reason = "ordering semantics for broadcast")] pub fn broadcast_bytes( self, other: &Cluster<'a, C2>, - ) -> Stream, Cluster<'a, C2>, Unbounded, Order::Min> + ) -> Stream< + >>::Out, + Cluster<'a, C2>, + Unbounded, + Order::Min, + > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); - self.flat_map(q!(|b| ids.iter().map(move |id| ( + self.flat_map_ordered(q!(|b| ids.iter().map(move |id| ( ::std::clone::Clone::clone(id), ::std::clone::Clone::clone(&b) )))) @@ -1077,10 +1226,10 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream { other: &Cluster<'a, C2>, ) -> Stream, Unbounded, Order::Min> where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, - Order: MinOrder>, + Order: MinOrder<>>::OutStrongestOrder>, { self.broadcast_bytes(other).map(q!(|(_, b)| b)) } @@ -1092,15 +1241,18 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self, other: &Cluster<'a, C2>, ) -> Stream< - L::Out, + >>::Out, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); @@ -1116,12 +1268,15 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { T, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { self.round_robin_bincode(other).map(q!(|(_, b)| b)) } @@ -1130,15 +1285,18 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { self, other: &Cluster<'a, C2>, ) -> Stream< - L::Out, + >>::Out, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { let ids = other.members(); @@ -1154,13 +1312,16 @@ impl<'a, T, L: Location<'a> + NoTick, B> Stream { Bytes, Cluster<'a, C2>, Unbounded, - >>::Min, + >>::OutStrongestOrder, + >>::Min, > where - L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + L::Root: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, - TotalOrder: MinOrder>, + TotalOrder: + MinOrder<>>::OutStrongestOrder>, { self.round_robin_bytes(other).map(q!(|(_, b)| b)) } diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index 00bf90b5348..0eff334e716 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -29,21 +29,25 @@ pub fn compute_pi<'a>( ) .all_ticks(); - trials + let estimate = trials .send_bincode_interleaved(&process) .reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; - })) - .sample_every(q!(Duration::from_secs(1))) - .for_each(q!(|(inside, total)| { - println!( - "pi: {} ({} trials)", - 4.0 * inside as f64 / total as f64, - total - ); })); + unsafe { + // SAFETY: intentional non-determinism + estimate.sample_every(q!(Duration::from_secs(1))) + } + .for_each(q!(|(inside, total)| { + println!( + "pi: {} ({} trials)", + 4.0 * inside as f64 / total as f64, + total + ); + })); + (cluster, process) } diff --git a/hydroflow_plus_test/src/cluster/map_reduce.rs b/hydroflow_plus_test/src/cluster/map_reduce.rs index a46e1d0d401..42d72a22188 100644 --- a/hydroflow_plus_test/src/cluster/map_reduce.rs +++ b/hydroflow_plus_test/src/cluster/map_reduce.rs @@ -11,22 +11,32 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<' .source_iter(q!(vec!["abc", "abc", "xyz", "abc"])) .map(q!(|s| s.to_string())); - words + let partitioned_words = words .round_robin_bincode(&cluster) - .map(q!(|string| (string, ()))) - .tick_batch(&cluster.tick()) - .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) - .inspect(q!(|(string, count)| println!( - "partition count: {} - {}", - string, count - ))) - .all_ticks() - .send_bincode_interleaved(&process) - .tick_batch(&process.tick()) - .persist() - .reduce_keyed_commutative(q!(|total, count| *total += count)) - .all_ticks() - .for_each(q!(|(string, count)| println!("{}: {}", string, count))); + .map(q!(|string| (string, ()))); + + let batches = unsafe { + // SAFETY: addition is associative so we can batch reduce + partitioned_words.timestamped(&cluster.tick()).tick_batch() + } + .fold_keyed(q!(|| 0), q!(|count, _| *count += 1)) + .inspect(q!(|(string, count)| println!( + "partition count: {} - {}", + string, count + ))) + .all_ticks() + .send_bincode_interleaved(&process); + + unsafe { + // SAFETY: addition is associative so we can batch reduce + batches + .timestamped(&process.tick()) + .tick_batch() + .persist() + .reduce_keyed_commutative(q!(|total, count| *total += count)) + } + .all_ticks() + .for_each(q!(|(string, count)| println!("{}: {}", string, count))); (process, cluster) } diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 875d3626dd7..e06628c2884 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -4,12 +4,12 @@ use std::hash::Hash; use std::time::Duration; use hydroflow_plus::*; +use location::tick::Timestamped; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use stream::NoOrder; -use tokio::time::Instant; -use super::quorum::collect_quorum; +use super::quorum::{collect_quorum, collect_quorum_with_response}; use super::request_response::join_responses; pub struct Proposer {} @@ -51,12 +51,28 @@ struct P2a

{ value: Option

, // might be a re-committed hole } +/// Implements the core Paxos algorithm, which uses a cluster of propsers and acceptors +/// to sequence payloads being sent to the proposers. +/// +/// Proposers that currently are the leader will work with acceptors to sequence incoming +/// payloads, but may drop payloads if they are not the lader or lose leadership during +/// the commit process. +/// +/// Returns a stream of ballots, where new values are emitted when a new leader is elected, +/// and a stream of sequenced payloads with an index and optional payload (in the case of +/// holes in the log). +/// +/// # Safety +/// When the leader is stable, the algorithm will commit incoming payloads to the leader +/// in deterministic order. However, when the leader is changing, payloads may be +/// non-deterministically dropped. The stream of ballots is also non-deterministic because +/// leaders are elected in a non-deterministic process. #[expect( clippy::too_many_arguments, clippy::type_complexity, reason = "internal paxos code // TODO" )] -pub fn paxos_core<'a, P: PaxosPayload, R>( +pub unsafe fn paxos_core<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, r_to_acceptors_checkpoint: Stream< @@ -90,43 +106,64 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let (a_log_complete_cycle, a_log_forward_reference) = acceptor_tick.forward_ref::>(); - let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( - proposers, - acceptors, - &proposer_tick, - &acceptor_tick, - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - sequencing_max_ballot_forward_reference, - a_log_forward_reference, - ); + let (p_ballot, p_is_leader, p_relevant_p1bs, a_max_ballot) = unsafe { + // SAFETY: The primary non-determinism exposed by leader election algorithm lies in which leader + // is elected, which affects both the ballot at each proposer and the leader flag. But using a stale ballot + // or leader flag will only lead to failure in sequencing rather than commiting the wrong value. Because + // ballots are non-deterministic, the acceptor max ballot is also non-deterministic, although we are + // guaranteed that the max ballot will match the current ballot of a proposer who believes they are the leader. + leader_election( + proposers, + acceptors, + &proposer_tick, + &acceptor_tick, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + sequencing_max_ballot_forward_reference, + a_log_forward_reference, + ) + }; let just_became_leader = p_is_leader .clone() .continue_unless(p_is_leader.clone().defer_tick()); - let (p_to_replicas, a_log, sequencing_max_ballots) = sequence_payload( - proposers, - acceptors, - &proposer_tick, - &acceptor_tick, - c_to_proposers, - r_to_acceptors_checkpoint, - p_ballot.clone(), - p_is_leader, - p_relevant_p1bs, - f, - a_max_ballot, - ); - - a_log_complete_cycle.complete(a_log); + let (p_to_replicas, a_log, sequencing_max_ballots) = unsafe { + // SAFETY: The relevant p1bs are non-deterministic because they come from a arbitrary quorum, but because + // we use a quorum, if we remain the leader there are no missing committed values when we combine the logs. + // The remaining non-determinism is in when incoming payloads are batched versus the leader flag and state + // of acceptors, which in the worst case will lead to dropped payloads as documented. + sequence_payload( + proposers, + acceptors, + &proposer_tick, + &acceptor_tick, + c_to_proposers, + r_to_acceptors_checkpoint, + p_ballot.clone(), + p_is_leader, + p_relevant_p1bs, + f, + a_max_ballot, + ) + }; + + a_log_complete_cycle.complete(unsafe { + // SAFETY: We will always write payloads to the log before acknowledging them to the proposers, + // which guarantees that if the leader changes the quorum overlap between sequencing and leader + // election will include the committed value. + a_log.latest_tick() + }); sequencing_max_ballot_complete_cycle.complete(sequencing_max_ballots); ( // Only tell the clients once when leader election concludes - just_became_leader.then(p_ballot).all_ticks(), + just_became_leader + .then(p_ballot) + .all_ticks() + .drop_timestamp(), p_to_replicas, ) } @@ -136,7 +173,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( +unsafe fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, proposer_tick: &Tick>, @@ -172,20 +209,27 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( proposer_id: ClusterId::from_raw(0) }))); - let (p_ballot, p_has_largest_ballot) = p_ballot_calc( - proposer_tick, - p_received_max_ballot.latest_tick(proposer_tick), - ); - - let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat( - proposers, - proposer_tick, - p_is_leader_forward_ref, - p_ballot.clone(), - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - ); + let (p_ballot, p_has_largest_ballot) = p_ballot_calc(proposer_tick, unsafe { + // SAFETY: A stale max ballot might result in us failing to become the leader, but which proposer + // becomes the leader is non-deterministic anyway. + p_received_max_ballot + .timestamped(proposer_tick) + .latest_tick() + }); + + let (p_to_proposers_i_am_leader, p_trigger_election) = unsafe { + // SAFETY: non-determinism in heartbeats may lead to additional leader election attempts, which + // is propagated to the non-determinism of which leader is elected. + p_leader_heartbeat( + proposers, + proposer_tick, + p_is_leader_forward_ref, + p_ballot.clone(), + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + ) + }; p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader); @@ -195,8 +239,17 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) .broadcast_bincode_interleaved(acceptors); - let (a_max_ballot, a_to_proposers_p1b) = - acceptor_p1(acceptor_tick, p_to_acceptors_p1a, a_log, proposers); + let (a_max_ballot, a_to_proposers_p1b) = acceptor_p1( + acceptor_tick, + unsafe { + // SAFETY: Non-deterministic batching may result in different payloads being rejected + // by an acceptor if the payload is batched with another payload with larger ballot. + // But as documented, payloads may be non-deterministically dropped during leader election. + p_to_acceptors_p1a.timestamped(acceptor_tick).tick_batch() + }, + a_log, + proposers, + ); let (p_is_leader, p_accepted_values, fail_ballots) = p_p1b( proposer_tick, @@ -206,7 +259,7 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( f, ); p_is_leader_complete_cycle.complete(p_is_leader.clone()); - p1b_fail_complete.complete(fail_ballots.all_ticks()); + p1b_fail_complete.complete(fail_ballots.drop_timestamp()); (p_ballot, p_is_leader, p_accepted_values, a_max_ballot) } @@ -257,35 +310,8 @@ fn p_ballot_calc<'a>( (p_ballot, p_has_largest_ballot) } -fn p_leader_expired<'a>( - proposer_tick: &Tick>, - p_to_proposers_i_am_leader: Stream, Unbounded, NoOrder>, - p_is_leader: Optional<(), Tick>, Bounded>, - i_am_leader_check_timeout: u64, // How often to check if heartbeat expired -) -> Optional, Tick>, Bounded> { - let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold_commutative( - q!(|| None), - q!(|latest, _| { - // Note: May want to check received ballot against our own? - *latest = Some(Instant::now()); - }), - ); - - p_latest_received_i_am_leader - .latest_tick(proposer_tick) - .continue_unless(p_is_leader) - .filter(q!(move |latest_received_i_am_leader| { - if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { - (Instant::now().duration_since(*latest_received_i_am_leader)) - > Duration::from_secs(i_am_leader_check_timeout) - } else { - true - } - })) -} - #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -fn p_leader_heartbeat<'a>( +unsafe fn p_leader_heartbeat<'a>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, p_is_leader: Optional<(), Tick>, Bounded>, @@ -295,49 +321,66 @@ fn p_leader_heartbeat<'a>( i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( Stream, Unbounded, NoOrder>, - Optional, Tick>, Bounded>, + Optional<(), Tick>, Bounded>, ) { - let p_to_proposers_i_am_leader = p_is_leader - .clone() - .then(p_ballot) - .latest() - .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) - .broadcast_bincode_interleaved(proposers); + let p_to_proposers_i_am_leader = unsafe { + // SAFETY: Delays in heartbeats may lead to leader election attempts even + // if the leader is alive. This will result in the previous leader receiving + // larger ballots from its peers and it will drop its leadership. + p_is_leader + .clone() + .then(p_ballot) + .latest() + .drop_timestamp() + .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) + } + .broadcast_bincode_interleaved(proposers); - let p_leader_expired = p_leader_expired( - proposer_tick, - p_to_proposers_i_am_leader.clone(), - p_is_leader, - i_am_leader_check_timeout, - ); + let p_leader_expired = unsafe { + // Delayed timeouts only affect which leader wins re-election. If the leadership flag + // is gained after timeout correctly ignore the timeout. If the flag is lost after + // timeout we correctly attempt to become the leader. + p_to_proposers_i_am_leader + .clone() + .timeout(q!(Duration::from_secs(i_am_leader_check_timeout))) + .timestamped(proposer_tick) + .latest_tick() + .continue_unless(p_is_leader) + }; // Add random delay depending on node ID so not everyone sends p1a at the same time - let p_trigger_election = p_leader_expired.continue_if( - proposers - .source_interval_delayed( - q!(Duration::from_secs( - (CLUSTER_SELF_ID.raw_id * i_am_leader_check_timeout_delay_multiplier as u32) - .into() - )), - q!(Duration::from_secs(i_am_leader_check_timeout)), - ) - .tick_batch(proposer_tick) - .first(), - ); + let p_trigger_election = unsafe { + // SAFETY: If the leader "un-expires" due to non-determinstic delay, we return + // to a stable leader state. If the leader remains expired, non-deterministic + // delay is propagated to the non-determinism of which leader is elected. + p_leader_expired.continue_if( + proposers + .source_interval_delayed( + q!(Duration::from_secs( + (CLUSTER_SELF_ID.raw_id + * i_am_leader_check_timeout_delay_multiplier as u32) + .into() + )), + q!(Duration::from_secs(i_am_leader_check_timeout)), + ) + .timestamped(proposer_tick) + .tick_batch() + .first(), + ) + }; (p_to_proposers_i_am_leader, p_trigger_election) } #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( acceptor_tick: &Tick>, - p_to_acceptors_p1a: Stream, Unbounded, NoOrder>, + p_to_acceptors_p1a: Stream>, Bounded, NoOrder>, a_log: Singleton>, Bounded>, proposers: &Cluster<'a, Proposer>, ) -> ( Singleton>, Bounded>, Stream<(Ballot, Result), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { - let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(acceptor_tick); let a_max_ballot = p_to_acceptors_p1a .clone() .inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a))) @@ -385,32 +428,36 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ) -> ( Optional<(), Tick>, Bounded>, Stream>, Bounded, NoOrder>, - Stream>, Bounded, NoOrder>, + Stream>, Unbounded, NoOrder>, ) { - let (quorums, fails) = collect_quorum( - proposer_tick, - a_to_proposers_p1b.tick_batch(proposer_tick), + let (quorums, fails) = collect_quorum_with_response( + a_to_proposers_p1b.timestamped(proposer_tick), f + 1, 2 * f + 1, ); - let p_received_quorum_of_p1bs = quorums - .persist() - .fold_keyed_commutative( - q!(|| vec![]), - q!(|logs, log| { - logs.push(log); - }), - ) - .max_by_key(q!(|t| t.0)) - .zip(p_ballot.clone()) - .filter_map(q!( - move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot { - Some(quorum_accepted) - } else { - None - } - )); + let p_received_quorum_of_p1bs = unsafe { + // SAFETY: All the values for a quorum will be emitted in a single batch, + // so we will not split up the quorum. + quorums.tick_batch() + } + .persist() + .fold_keyed_commutative( + q!(|| vec![]), + q!(|logs, log| { + // even though this is non-commutative, we use `flatten_unordered` later + logs.push(log); + }), + ) + .max_by_key(q!(|t| t.0)) + .zip(p_ballot.clone()) + .filter_map(q!( + move |((quorum_ballot, quorum_accepted), my_ballot)| if quorum_ballot == my_ballot { + Some(quorum_accepted) + } else { + None + } + )); let p_is_leader = p_received_quorum_of_p1bs .clone() @@ -419,8 +466,8 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( ( p_is_leader, - // The fold was not commutative, so this is unordered. - p_received_quorum_of_p1bs.flatten().assume_ordering(), + // we used an unordered accumulator, so flattened has no order + p_received_quorum_of_p1bs.flatten_unordered(), fails.map(q!(|(_, ballot)| ballot)), ) } @@ -440,7 +487,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( Optional>, Bounded>, ) { let p_p1b_highest_entries_and_count = accepted_logs - .flatten() // Convert HashMap log back to stream + .flatten_unordered() // Convert HashMap log back to stream .fold_keyed_commutative::<(usize, Option>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| { if let Some(curr_entry_payload) = &mut curr_entry.1 { let same_values = new_entry.value == curr_entry_payload.value; @@ -486,7 +533,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( .map(q!(|(slot, _)| slot)); let p_log_holes = p_max_slot .clone() - .flat_map(q!(|max_slot| 0..max_slot)) + .flat_map_ordered(q!(|max_slot| 0..max_slot)) .filter_not_in(p_proposed_slots) .cross_singleton(p_ballot.clone()) .map(q!(|(slot, ballot)| P2a { @@ -503,7 +550,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>( clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -fn sequence_payload<'a, P: PaxosPayload, R>( +unsafe fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, proposer_tick: &Tick>, @@ -530,18 +577,23 @@ fn sequence_payload<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>, Bounded>, ) -> ( Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded, NoOrder>, - Singleton>, Tick>, Bounded>, + Singleton>, Timestamped>, Unbounded>, Stream, Unbounded, NoOrder>, ) { let (p_log_to_recommit, p_max_slot) = recommit_after_leader_election(p_relevant_p1bs, p_ballot.clone(), f); - let indexed_payloads = index_payloads( - proposer_tick, - p_max_slot, - c_to_proposers, - p_is_leader.clone(), - ); + let indexed_payloads = index_payloads(proposer_tick, p_max_slot, unsafe { + // SAFETY: We batch payloads so that we can compute the correct slot based on + // base slot. In the case of a leader re-election, the base slot is updated which + // affects the computed payload slots. This non-determinism can lead to non-determinism + // in which payloads are committed when the leader is changing, which is documented at + // the function level. + c_to_proposers + .timestamped(proposer_tick) + .tick_batch() + .continue_if(p_is_leader.clone()) + }); let payloads_to_send = indexed_payloads .cross_singleton(p_ballot.clone()) @@ -550,14 +602,14 @@ fn sequence_payload<'a, P: PaxosPayload, R>( Some(payload) ))) .union(p_log_to_recommit.map(q!(|p2a| ((p2a.slot, p2a.ballot), p2a.value)))) - .continue_if(p_is_leader); + .continue_if(p_is_leader) + .all_ticks(); let (a_log, a_to_proposers_p2b) = acceptor_p2( acceptor_tick, a_max_ballot.clone(), payloads_to_send .clone() - .all_ticks() .map(q!(|((slot, ballot), value)| P2a { ballot, slot, @@ -571,23 +623,23 @@ fn sequence_payload<'a, P: PaxosPayload, R>( // TOOD: only persist if we are the leader let (quorums, fails) = collect_quorum( - proposer_tick, - a_to_proposers_p2b.clone().tick_batch(proposer_tick), + a_to_proposers_p2b.timestamped(proposer_tick), f + 1, 2 * f + 1, ); - let p_to_replicas = join_responses( - proposer_tick, - quorums.keys().map(q!(|k| (k, ()))), - payloads_to_send, - ) - .all_ticks(); + let p_to_replicas = join_responses(proposer_tick, quorums.map(q!(|k| (k, ()))), unsafe { + // SAFETY: The metadata will always be generated before we get a quorum + // because `payloads_to_send` is used to send the payloads to acceptors. + payloads_to_send.tick_batch() + }); ( - p_to_replicas.map(q!(|((slot, _ballot), (value, _))| (slot, value))), + p_to_replicas + .map(q!(|((slot, _ballot), (value, _))| (slot, value))) + .drop_timestamp(), a_log.map(q!(|(_ckpnt, log)| log)), - fails.map(q!(|(_, ballot)| ballot)).all_ticks(), + fails.map(q!(|(_, ballot)| ballot)).drop_timestamp(), ) } @@ -601,8 +653,7 @@ enum CheckpointOrP2a

{ fn index_payloads<'a, P: PaxosPayload>( proposer_tick: &Tick>, p_max_slot: Optional>, Bounded>, - c_to_proposers: Stream, Unbounded>, - p_is_leader: Optional<(), Tick>, Bounded>, + c_to_proposers: Stream>, Bounded>, ) -> Stream<(usize, P), Tick>, Bounded> { let (p_next_slot_complete_cycle, p_next_slot) = proposer_tick.cycle_with_initial::>(proposer_tick.singleton(q!(0))); @@ -611,8 +662,6 @@ fn index_payloads<'a, P: PaxosPayload>( let base_slot = p_next_slot_after_reconciling_p1bs.unwrap_or(p_next_slot); let p_indexed_payloads = c_to_proposers - .tick_batch(proposer_tick) - .continue_if(p_is_leader) .enumerate() .cross_singleton(base_slot.clone()) .map(q!(|((index, payload), base_slot)| ( @@ -645,19 +694,37 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, f: usize, ) -> ( - Singleton<(Option, HashMap>), Tick>, Bounded>, + Singleton< + (Option, HashMap>), + Timestamped>, + Unbounded, + >, Stream<((usize, Ballot), Result<(), Ballot>), Cluster<'a, Proposer>, Unbounded, NoOrder>, ) { - let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(acceptor_tick); + let p_to_acceptors_p2a_batch = unsafe { + // SAFETY: we use batches to ensure that the log is updated before sending + // a confirmation to the proposer. Because we use `persist()` on these + // messages before folding into the log, non-deterministic batch boundaries + // will not affect the eventual log state. + p_to_acceptors_p2a.timestamped(acceptor_tick).tick_batch() + }; // Get the latest checkpoint sequence per replica - let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint - .tick_prefix(acceptor_tick) - .reduce_keyed_commutative(q!(|curr_seq, seq| { - if seq > *curr_seq { - *curr_seq = seq; - } - })); + let a_checkpoint_largest_seqs = unsafe { + // SAFETY: if a checkpoint is delayed, its effect is that the log may contain slots + // that do not need to be saved (because the data is at all replicas). This affects + // the logs that will be collected during a leader re-election, but eventually the + // same checkpoint will arrive at acceptors and those slots will be eventually deleted. + r_to_acceptors_checkpoint + .timestamped(acceptor_tick) + .tick_batch() + } + .persist() + .reduce_keyed_commutative(q!(|curr_seq, seq| { + if seq > *curr_seq { + *curr_seq = seq; + } + })); let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs.clone().count().filter_map(q!( move |num_received| if num_received == f + 1 { Some(true) @@ -686,7 +753,7 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( )); let a_log = a_p2as_to_place_in_log .union(a_new_checkpoint.into_stream()) - .persist() + .all_ticks() .fold_commutative( q!(|| (None, HashMap::new())), q!(|(prev_checkpoint, log), checkpoint_or_p2a| { diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index f8b8d9625d0..89ac8583b1f 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -44,41 +44,70 @@ pub fn paxos_bench<'a>( ballot ))) .max() - .map(q!(|ballot: Ballot| ballot.proposer_id)) - .latest_tick(&client_tick); + .map(q!(|ballot: Ballot| ballot.proposer_id)); - let leader_changed = cur_leader_id.clone().delta().map(q!(|_| ())).all_ticks(); + let leader_changed = unsafe { + // SAFETY: we are okay if we miss a transient leader ID, because we + // will eventually get the latest one and can restart requests then + cur_leader_id + .clone() + .timestamped(&client_tick) + .latest_tick() + .delta() + .map(q!(|_| ())) + .all_ticks() + .drop_timestamp() + }; bench_client( &clients, leader_changed, |c_to_proposers| { - let (new_leader_elected, processed_payloads) = paxos_kv( - &proposers, - &acceptors, - &replicas, + let to_proposers = unsafe { + // SAFETY: the risk here is that we send a batch of requests + // with a stale leader ID, but because the leader ID comes from the + // network there is no way to guarantee that it is up to date + + // TODO(shadaj): we should retry if we get an error due to sending + // to a stale leader c_to_proposers - .tick_batch(&client_tick) - .cross_singleton(cur_leader_id) + .timestamped(&client_tick) + .tick_batch() + .cross_singleton(cur_leader_id.timestamped(&client_tick).latest_tick()) .all_ticks() - .map(q!(move |((key, value), leader_id)| (leader_id, KvPayload { - key, - // we use our ID as part of the value and use that so the replica only notifies us - value: ( - CLUSTER_SELF_ID, - value - ) - }))) - .send_bincode_interleaved(&proposers) - // clients "own" certain keys, so interleaving elements from clients will not affect - // the order of writes to the same key - .assume_ordering(), - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - checkpoint_frequency, - ); + } + .map(q!(move |((key, value), leader_id)| ( + leader_id, + KvPayload { + key, + // we use our ID as part of the value and use that so the replica only notifies us + value: (CLUSTER_SELF_ID, value) + } + ))) + .send_bincode_interleaved(&proposers); + + let to_proposers = unsafe { + // SAFETY: clients "own" certain keys, so interleaving elements from clients will not affect + // the order of writes to the same key + to_proposers.assume_ordering() + }; + + let (new_leader_elected, processed_payloads) = unsafe { + // SAFETY: Non-deterministic leader notifications are handled in `to_proposers`. We do not + // care about the order in which key writes are processed, which is the non-determinism in + // `processed_payloads`. + paxos_kv( + &proposers, + &acceptors, + &replicas, + to_proposers, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + checkpoint_frequency, + ) + }; new_leader_elected_complete .complete(new_leader_elected.broadcast_bincode_interleaved(&clients)); @@ -91,14 +120,13 @@ pub fn paxos_bench<'a>( .send_bincode_interleaved(&clients); // we only mark a transaction as committed when all replicas have applied it - let (c_quorum_payloads, _) = collect_quorum::<_, _, _, _, ()>( - &client_tick, - c_received_payloads.tick_batch(&client_tick), + let (c_quorum_payloads, _) = collect_quorum::<_, _, _, ()>( + c_received_payloads.timestamped(&client_tick), f + 1, f + 1, ); - c_quorum_payloads.keys().all_ticks() + c_quorum_payloads.drop_timestamp() }, num_clients_per_node, median_latency_window_size, @@ -120,9 +148,17 @@ fn bench_client<'a>( // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Whenever the leader changes, make all clients send a message - let restart_this_tick = trigger_restart.tick_batch(&client_tick).last(); + let restart_this_tick = unsafe { + // SAFETY: non-deterministic delay in restarting requests + // is okay because once it is restarted statistics should reach + // steady state regardless of when the restart happes + trigger_restart + .timestamped(&client_tick) + .tick_batch() + .last() + }; - let c_new_payloads_when_restart = restart_this_tick.clone().flat_map(q!(move |_| (0 + let c_new_payloads_when_restart = restart_this_tick.clone().flat_map_ordered(q!(move |_| (0 ..num_clients_per_node) .map(move |i| ( (CLUSTER_SELF_ID.raw_id * (num_clients_per_node as u32)) + i as u32, @@ -131,19 +167,30 @@ fn bench_client<'a>( let (c_to_proposers_complete_cycle, c_to_proposers) = clients.forward_ref::>(); - let c_received_quorum_payloads = transaction_cycle(c_to_proposers).tick_batch(&client_tick); + let c_received_quorum_payloads = unsafe { + // SAFETY: because the transaction processor is required to handle arbitrary reordering + // across *different* keys, we are safe because delaying a transaction result for a key + // will only affect when the next request for that key is emitted with respect to other + // keys + transaction_cycle(c_to_proposers) + .timestamped(&client_tick) + .tick_batch() + }; // Whenever all replicas confirm that a payload was committed, send another payload let c_new_payloads_when_committed = c_received_quorum_payloads .clone() .map(q!(|payload| (payload.0, payload.1 + 1))); c_to_proposers_complete_cycle.complete( - // we don't send a new write for the same key until the previous one is committed, - // so writes to the same key are ordered c_new_payloads_when_restart - .union(c_new_payloads_when_committed) + .chain(unsafe { + // SAFETY: we don't send a new write for the same key until the previous one is committed, + // so this contains only a single write per key, and we don't care about order + // across keys + c_new_payloads_when_committed.assume_ordering() + }) .all_ticks() - .assume_ordering(), + .drop_timestamp(), ); // Track statistics @@ -151,7 +198,7 @@ fn bench_client<'a>( client_tick.cycle::>(); let c_new_timers_when_leader_elected = restart_this_tick .map(q!(|_| SystemTime::now())) - .flat_map(q!( + .flat_map_ordered(q!( move |now| (0..num_clients_per_node).map(move |virtual_id| (virtual_id, now)) )); let c_updated_timers = c_received_quorum_payloads @@ -168,10 +215,14 @@ fn bench_client<'a>( })); c_timers_complete_cycle.complete_next_tick(c_new_timers); - let c_stats_output_timer = clients - .source_interval(q!(Duration::from_secs(1))) - .tick_batch(&client_tick) - .first(); + let c_stats_output_timer = unsafe { + // SAFETY: intentionally sampling statistics + clients + .source_interval(q!(Duration::from_secs(1))) + .timestamped(&client_tick) + .tick_batch() + } + .first(); let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick(); @@ -182,7 +233,7 @@ fn bench_client<'a>( ))) .union(c_latency_reset.into_stream()) .all_ticks() - .flatten() + .flatten_ordered() .fold_commutative( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size @@ -230,21 +281,22 @@ fn bench_client<'a>( }), ); - c_latencies - .zip(c_throughput) - .latest_tick(&client_tick) - .continue_if(c_stats_output_timer) - .all_ticks() - .for_each(q!(move |(latencies, throughput)| { - let mut latencies_mut = latencies.borrow_mut(); - if latencies_mut.len() > 0 { - let middle_idx = latencies_mut.len() / 2; - let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); - println!("Median latency: {}ms", (*median) as f64 / 1000.0); - } + unsafe { + // SAFETY: intentionally sampling statistics + c_latencies.zip(c_throughput).latest_tick() + } + .continue_if(c_stats_output_timer) + .all_ticks() + .for_each(q!(move |(latencies, throughput)| { + let mut latencies_mut = latencies.borrow_mut(); + if latencies_mut.len() > 0 { + let middle_idx = latencies_mut.len() / 2; + let (_, median, _) = latencies_mut.select_nth_unstable(middle_idx); + println!("Median latency: {}ms", (*median) as f64 / 1000.0); + } - println!("Throughput: {} requests/s", throughput); - })); + println!("Throughput: {} requests/s", throughput); + })); // End track statistics } diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 83d1ff0bfd8..3683abc2d06 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -42,12 +42,17 @@ impl PartialOrd for SequencedKv { } } +/// Sets up a linearizable key-value store using Paxos. +/// +/// # Safety +/// Notifications for leader election are non-deterministic. When the leader is changing, +/// writes may be dropped by the old leader. #[expect( clippy::type_complexity, clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -pub fn paxos_kv<'a, K: KvKey, V: KvValue>( +pub unsafe fn paxos_kv<'a, K: KvKey, V: KvValue>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, replicas: &Cluster<'a, Replica>, @@ -64,16 +69,19 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = replicas.forward_ref::>(); - let (p_to_clients_new_leader_elected, p_to_replicas) = paxos_core( - proposers, - acceptors, - r_to_acceptors_checkpoint.broadcast_bincode(acceptors), - c_to_proposers, - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - ); + let (p_to_clients_new_leader_elected, p_to_replicas) = unsafe { + // SAFETY: Leader election non-determinism and non-deterministic dropping of writes is documented. + paxos_core( + proposers, + acceptors, + r_to_acceptors_checkpoint.broadcast_bincode(acceptors), + c_to_proposers, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + ) + }; let (r_to_acceptors_checkpoint_new, r_new_processed_payloads) = replica( replicas, @@ -102,8 +110,13 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replica_tick.cycle(); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); - let r_sorted_payloads = p_to_replicas - .tick_batch(&replica_tick) + let r_sorted_payloads = unsafe { + // SAFETY: because we fill slots one-by-one, we can safely batch + // because non-determinism is resolved when we sort by slots + p_to_replicas + .timestamped(&replica_tick) + .tick_batch() + } .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it @@ -183,5 +196,8 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let r_to_clients = r_processable_payloads .filter_map(q!(|payload| payload.kv)) .all_ticks(); - (r_checkpoint_seq_new.all_ticks(), r_to_clients) + ( + r_checkpoint_seq_new.all_ticks().drop_timestamp(), + r_to_clients.drop_timestamp(), + ) } diff --git a/hydroflow_plus_test/src/cluster/quorum.rs b/hydroflow_plus_test/src/cluster/quorum.rs index 7990cba7c23..b0fa2c8c369 100644 --- a/hydroflow_plus_test/src/cluster/quorum.rs +++ b/hydroflow_plus_test/src/cluster/quorum.rs @@ -1,10 +1,11 @@ use std::hash::Hash; use hydroflow_plus::*; +use location::tick::Timestamped; use location::NoTick; #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -pub fn collect_quorum< +pub fn collect_quorum_with_response< 'a, L: Location<'a> + NoTick, Order, @@ -12,17 +13,21 @@ pub fn collect_quorum< V: Clone, E: Clone, >( - tick: &Tick, - responses: Stream<(K, Result), Tick, Bounded, Order>, + responses: Stream<(K, Result), Timestamped, Unbounded, Order>, min: usize, max: usize, ) -> ( - Stream<(K, V), Tick, Bounded, Order>, - Stream<(K, E), Tick, Bounded, Order>, + Stream<(K, V), Timestamped, Unbounded, Order>, + Stream<(K, E), Timestamped, Unbounded, Order>, ) { + let tick = responses.timestamp_source(); let (not_all_complete_cycle, not_all) = tick.cycle::>(); - let current_responses = not_all.union(responses.clone()); + let current_responses = not_all.union(unsafe { + // SAFETY: we always persist values that have not reached quorum, so even + // with arbitrary batching we always produce deterministic quorum results + responses.clone().tick_batch() + }); let count_per_key = current_responses.clone().fold_keyed_commutative( q!(move || (0, 0)), @@ -73,11 +78,8 @@ pub fn collect_quorum< min_but_not_max_complete_cycle .complete_next_tick(reached_min_count.filter_not_in(received_from_all.clone())); - not_all_complete_cycle.complete_next_tick( - current_responses - .clone() - .anti_join(received_from_all.clone()), - ); + not_all_complete_cycle + .complete_next_tick(current_responses.clone().anti_join(received_from_all)); current_responses .anti_join(not_reached_min_count) @@ -85,10 +87,90 @@ pub fn collect_quorum< }; ( - just_reached_quorum.filter_map(q!(move |(key, res)| match res { - Ok(v) => Some((key, v)), - Err(_) => None, + just_reached_quorum + .filter_map(q!(move |(key, res)| match res { + Ok(v) => Some((key, v)), + Err(_) => None, + })) + .all_ticks(), + responses.filter_map(q!(move |(key, res)| match res { + Ok(_) => None, + Err(e) => Some((key, e)), })), + ) +} + +#[expect(clippy::type_complexity, reason = "quorum types are complex")] +pub fn collect_quorum<'a, L: Location<'a> + NoTick, Order, K: Clone + Eq + Hash, E: Clone>( + responses: Stream<(K, Result<(), E>), Timestamped, Unbounded, Order>, + min: usize, + max: usize, +) -> ( + Stream, Unbounded, Order>, + Stream<(K, E), Timestamped, Unbounded, Order>, +) { + let tick = responses.timestamp_source(); + let (not_all_complete_cycle, not_all) = tick.cycle::>(); + + let current_responses = not_all.union(unsafe { + // SAFETY: we always persist values that have not reached quorum, so even + // with arbitrary batching we always produce deterministic quorum results + responses.clone().tick_batch() + }); + + let count_per_key = current_responses.clone().fold_keyed_commutative( + q!(move || (0, 0)), + q!(move |accum, value| { + if value.is_ok() { + accum.0 += 1; + } else { + accum.1 += 1; + } + }), + ); + + let reached_min_count = + count_per_key + .clone() + .filter_map(q!(move |(key, (success, _error))| if success >= min { + Some(key) + } else { + None + })); + + let just_reached_quorum = if max == min { + not_all_complete_cycle.complete_next_tick( + current_responses + .clone() + .anti_join(reached_min_count.clone()), + ); + + reached_min_count + } else { + let (min_but_not_max_complete_cycle, min_but_not_max) = tick.cycle(); + + let received_from_all = + count_per_key.filter_map(q!( + move |(key, (success, error))| if (success + error) >= max { + Some(key) + } else { + None + } + )); + + min_but_not_max_complete_cycle.complete_next_tick( + reached_min_count + .clone() + .filter_not_in(received_from_all.clone()), + ); + + not_all_complete_cycle.complete_next_tick(current_responses.anti_join(received_from_all)); + + reached_min_count.filter_not_in(min_but_not_max) + }; + + ( + just_reached_quorum.all_ticks(), responses.filter_map(q!(move |(key, res)| match res { Ok(_) => None, Err(e) => Some((key, e)), diff --git a/hydroflow_plus_test/src/cluster/request_response.rs b/hydroflow_plus_test/src/cluster/request_response.rs index 4c8e71af120..945baa0e456 100644 --- a/hydroflow_plus_test/src/cluster/request_response.rs +++ b/hydroflow_plus_test/src/cluster/request_response.rs @@ -1,18 +1,21 @@ use std::hash::Hash; use hydroflow_plus::*; +use location::tick::Timestamped; use location::NoTick; use stream::NoOrder; -type JoinResponses = Stream<(K, (M, V)), Tick, Bounded, NoOrder>; +type JoinResponses = Stream<(K, (M, V)), Timestamped, Unbounded, NoOrder>; /// Given an incoming stream of request-response responses, joins with metadata generated /// at request time that is stored in-memory. /// -/// Only one response element should be produced with a given key, same for the metadata stream. +/// The metadata must be generated in the same or a previous tick than the response, +/// typically at request time. Only one response element should be produced with a given +/// key, same for the metadata stream. pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location<'a> + NoTick>( tick: &Tick, - responses: Stream<(K, V), Tick, Bounded, NoOrder>, + responses: Stream<(K, V), Timestamped, Unbounded, NoOrder>, metadata: Stream<(K, M), Tick, Bounded, NoOrder>, ) -> JoinResponses { let (remaining_to_join_complete_cycle, remaining_to_join) = @@ -20,6 +23,12 @@ pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location< let remaining_and_new: Stream<(K, M), Tick, Bounded, _> = remaining_to_join.union(metadata); + let responses = unsafe { + // SAFETY: because we persist the metadata, delays resulting from + // batching boundaries do not affect the output contents. + responses.tick_batch() + }; + // TODO(shadaj): we should have a "split-join" operator // that returns both join and anti-join without cloning let joined_this_tick = @@ -31,5 +40,5 @@ pub fn join_responses<'a, K: Clone + Eq + Hash, M: Clone, V: Clone, L: Location< remaining_to_join_complete_cycle .complete_next_tick(remaining_and_new.anti_join(responses.map(q!(|(key, _)| key)))); - joined_this_tick + joined_this_tick.all_ticks() } diff --git a/hydroflow_plus_test/src/cluster/simple_cluster.rs b/hydroflow_plus_test/src/cluster/simple_cluster.rs index 0a65ae3e81a..64b7e5045f9 100644 --- a/hydroflow_plus_test/src/cluster/simple_cluster.rs +++ b/hydroflow_plus_test/src/cluster/simple_cluster.rs @@ -35,12 +35,10 @@ pub fn simple_cluster<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, ()>, Cluster<' ids.cross_product(numbers) .map(q!(|(id, n)| (id, (id, n)))) .send_bincode(&cluster) - .tick_batch(&cluster.tick()) .inspect(q!(move |n| println!( "cluster received: {:?} (self cluster id: {})", n, CLUSTER_SELF_ID ))) - .all_ticks() .send_bincode(&process) .for_each(q!(|(id, d)| println!("node received: ({}, {:?})", id, d))); diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 392e8bfc87f..b960be7d85a 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -321,39 +321,39 @@ expression: built.ir() inner: , }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), input: CrossSingleton( - Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout__free = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout__free) } else { true } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (() , ()) , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | (d , _signal) | d }), + input: CrossSingleton( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , core :: option :: Option < () > > ({ use hydroflow_plus :: __staged :: stream :: * ; let duration__free = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout__free = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout__free) } ; move | latest_received | { if let Some (latest_received) = latest_received { if Instant :: now () . duration_since (latest_received) > duration__free { Some (()) } else { None } } else { Some (()) } } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: stream :: * ; | | None }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), input: Persist( Tee { inner: , }, ), }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: optional :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , }, }, }, - ), - }, + }, + ), }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: optional :: * ; | _u | () }), @@ -740,27 +740,27 @@ expression: built.ir() ), input: DeferTick( Difference( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), - input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), - input: Tee { - inner: : Chain( - CycleSource { - ident: Ident { - sym: cycle_8, - }, - location_kind: Tick( - 2, - Cluster( - 0, + Tee { + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + input: Tee { + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + input: Tee { + inner: : Chain( + CycleSource { + ident: Ident { + sym: cycle_8, + }, + location_kind: Tick( + 2, + Cluster( + 0, + ), ), - ), - }, - Tee { - inner: : Tee { + }, + Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >)) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { @@ -910,8 +910,8 @@ expression: built.ir() }, }, }, - }, - ), + ), + }, }, }, }, @@ -920,7 +920,7 @@ expression: built.ir() inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -940,7 +940,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: , + inner: , }, Tee { inner: , @@ -983,40 +983,22 @@ expression: built.ir() input: Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (k , _) | k }), - input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydroflow_plus :: __staged :: stream :: * ; | | () }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _ , _ | { } }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), - input: AntiJoin( - AntiJoin( - Tee { - inner: , - }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), - input: Tee { - inner: , - }, - }, - ), - CycleSource { - ident: Ident { - sym: cycle_9, - }, - location_kind: Tick( - 2, - Cluster( - 0, - ), - ), - }, - ), + input: Difference( + Tee { + inner: , + }, + CycleSource { + ident: Ident { + sym: cycle_9, }, + location_kind: Tick( + 2, + Cluster( + 0, + ), + ), }, - }, + ), }, }, }, @@ -1146,7 +1128,7 @@ expression: built.ir() input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1156,7 +1138,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1229,7 +1211,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1256,7 +1238,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1288,7 +1270,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1328,7 +1310,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1349,7 +1331,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Tick( - 7, + 8, Cluster( 3, ), @@ -1539,14 +1521,16 @@ expression: built.ir() }, ), }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), - input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), - input: Tee { - inner: , + Tee { + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + input: Tee { + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + input: Tee { + inner: , + }, }, }, }, @@ -1565,7 +1549,7 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let CLUSTER_SELF_ID__free = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node__free = 1usize ; move | _ | (0 .. num_clients_per_node__free) . map (move | i | ((CLUSTER_SELF_ID__free . raw_id * (num_clients_per_node__free as u32)) + i as u32 , 0)) }), input: Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | * curr = new }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | () }), @@ -1581,26 +1565,8 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (u32 , u32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . 0 , payload . 1 + 1) }), input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , ()) , (u32 , u32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (k , _) | k }), - input: FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < () > ({ use hydroflow_plus :: __staged :: stream :: * ; | | () }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < () , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _ , _ | { } }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , core :: result :: Result < () , () >) , core :: option :: Option < ((u32 , u32) , ()) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), - input: AntiJoin( - Tee { - inner: , - }, - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), - input: Tee { - inner: , - }, - }, - ), - }, - }, + inner: : Tee { + inner: , }, }, }, @@ -1622,7 +1588,7 @@ expression: built.ir() input: Chain( Chain( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, @@ -1639,16 +1605,16 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , u32) , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (key , _prev_count) | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1675,10 +1641,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1686,7 +1652,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval__free = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval__free)) }, ), @@ -1716,7 +1682,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (u32 , u32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1727,7 +1693,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1739,7 +1705,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1750,7 +1716,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index dd89438e0b0..4a5f8220cb6 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -1,5 +1,7 @@ use hydroflow_plus::*; +use super::quorum::collect_quorum; + // if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. // @@ -32,7 +34,7 @@ pub fn two_pc<'a>( let c_receive_client_transactions = client_transaction.send_bincode(&coordinator); c_receive_client_transactions .clone() - .inspect(q!(|t| println!( + .for_each(q!(|t| println!( "receive transaction {}, ready to broadcast", t ))); @@ -40,47 +42,46 @@ pub fn two_pc<'a>( // broadcast prepare message to participants. let p_receive_prepare = c_receive_client_transactions.broadcast_bincode(&participants); - // assume all participants reply commit - let p_ready_to_commit = p_receive_prepare.map(q!(|t| (t, String::from("commit")))); + // participant 1 aborts transaction 1 + let p_ready_to_commit = p_receive_prepare.map(q!(move |t| ( + t, + if t == 1 && CLUSTER_SELF_ID.raw_id == 1 { + "abort".to_string() + } else { + "commit".to_string() + } + ))); let c_received_reply = p_ready_to_commit.send_bincode(&coordinator); // c_received_reply.clone().inspect(q!(|(id, (t, reply))| println!("participant {id} said {reply} for transaction {t}"))); // collect votes from participant. - // aborted transactions. - let c_participant_voted_abort = c_received_reply - .clone() - .filter(q!(|(_id, (_t, reply))| reply == "abort")) - .map(q!(|(id, (t, _reply))| (t, id))); - let p_receive_abort = c_participant_voted_abort.broadcast_bincode(&participants); - p_receive_abort.clone().inspect(q!(|(t, id)| println!( - "{} vote abort for transaction {}", - id, t - ))); + let coordinator_tick = coordinator.tick(); + let (c_all_commit, c_participant_voted_abort) = collect_quorum( + c_received_reply + .map(q!(|(id, (t, reply))| ( + t, + if reply == "commit" { Ok(()) } else { Err(id) } + ))) + .timestamped(&coordinator_tick), + num_participants as usize, + num_participants as usize, + ); + + let p_receive_abort = c_participant_voted_abort + // TODO(shadaj): if multiple participants vote abort we should deduplicate + .inspect(q!(|(t, id)| println!( + "{} vote abort for transaction {}", + id, t + ))) + .broadcast_bincode(&participants); let c_receive_ack = p_receive_abort.send_bincode(&coordinator); c_receive_ack.for_each(q!(|(id, (t, _))| println!( "Coordinator receive participant {} abort for transaction {}", id, t ))); - // committed transactions - let c_participant_voted_commit = c_received_reply - .filter(q!(|(_id, (_t, reply))| reply == "commit")) - .map(q!(|(id, (t, _reply))| (t, id))) - // fold_keyed: 1 input stream of type (K, V1), 1 output stream of type (K, V2). - // The output will have one tuple for each distinct K, with an accumulated value of type V2. - .tick_batch(&coordinator.tick()).fold_keyed_commutative(q!(|| 0), q!(|old: &mut u32, _| *old += 1)).filter_map(q!(move |(t, count)| { - // here I set the participant to 3. If want more or less participant, fix line 26 of examples/broadcast.rs - if count == num_participants { - Some(t) - } else { - None - } - })); - // broadcast commit transactions to participants. - let p_receive_commit = c_participant_voted_commit - .all_ticks() - .broadcast_bincode(&participants); + let p_receive_commit = c_all_commit.broadcast_bincode(&participants); // p_receive_commit.clone().for_each(q!(|t| println!("commit for transaction {}", t))); let c_receive_ack = p_receive_commit.send_bincode(&coordinator); diff --git a/hydroflow_plus_test_local/src/local/chat_app.rs b/hydroflow_plus_test_local/src/local/chat_app.rs index 566b5effbe0..26c53a528ae 100644 --- a/hydroflow_plus_test_local/src/local/chat_app.rs +++ b/hydroflow_plus_test_local/src/local/chat_app.rs @@ -14,15 +14,27 @@ pub fn chat_app<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let users = process - .source_stream(users_stream) - .tick_batch(&tick) - .persist(); + let users = unsafe { + // SAFETY: intentionally non-deterministic to not send messaged + // to users that joined after the message was sent + process + .source_stream(users_stream) + .timestamped(&tick) + .tick_batch() + } + .persist(); let messages = process.source_stream(messages); let messages = if replay_messages { - messages.tick_batch(&tick).persist() + unsafe { + // SAFETY: see above + messages.timestamped(&tick).tick_batch() + } + .persist() } else { - messages.tick_batch(&tick) + unsafe { + // SAFETY: see above + messages.timestamped(&tick).tick_batch() + } }; // do this after the persist to test pullup diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index 131c529111b..b790f8d931f 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -21,21 +21,25 @@ pub fn compute_pi<'a>(flow: &FlowBuilder<'a>, batch_size: RuntimeData) -> *total += 1; }), ) - .all_ticks(); - - trials - .reduce(q!(|(inside, total), (inside_batch, total_batch)| { - *inside += inside_batch; - *total += total_batch; - })) - .sample_every(q!(Duration::from_secs(1))) - .for_each(q!(|(inside, total)| { - println!( - "pi: {} ({} trials)", - 4.0 * inside as f64 / total as f64, - total - ); - })); + .all_ticks() + .drop_timestamp(); + + let estimate = trials.reduce(q!(|(inside, total), (inside_batch, total_batch)| { + *inside += inside_batch; + *total += total_batch; + })); + + unsafe { + // SAFETY: intentional non-determinism + estimate.sample_every(q!(Duration::from_secs(1))) + } + .for_each(q!(|(inside, total)| { + println!( + "pi: {} ({} trials)", + 4.0 * inside as f64 / total as f64, + total + ); + })); process } diff --git a/hydroflow_plus_test_local/src/local/count_elems.rs b/hydroflow_plus_test_local/src/local/count_elems.rs index ee670a6bfd0..528bc24db2a 100644 --- a/hydroflow_plus_test_local/src/local/count_elems.rs +++ b/hydroflow_plus_test_local/src/local/count_elems.rs @@ -12,11 +12,12 @@ pub fn count_elems_generic<'a, T: 'a>( let tick = process.tick(); let source = process.source_stream(input_stream); - let count = source - .map(q!(|_| 1)) - .tick_batch(&tick) - .fold(q!(|| 0), q!(|a, b| *a += b)) - .all_ticks(); + let count = unsafe { + // SAFETY: intentionally using ticks + source.map(q!(|_| 1)).timestamped(&tick).tick_batch() + } + .fold(q!(|| 0), q!(|a, b| *a += b)) + .all_ticks(); count.for_each(q!(|v| { output.send(v).unwrap(); diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index 0d50a88f51a..84a6b4b0799 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -19,11 +19,20 @@ pub fn graph_reachability<'a>( let reachability_tick = process.tick(); let (set_reached_cycle, reached_cycle) = reachability_tick.cycle::>(); - let reached = roots.tick_batch(&reachability_tick).union(reached_cycle); + let reached = unsafe { + // SAFETY: roots can be inserted on any tick because we are fixpointing + roots + .timestamped(&reachability_tick) + .tick_batch() + .union(reached_cycle) + }; let reachable = reached .clone() .map(q!(|r| (r, ()))) - .join(edges.tick_batch(&reachability_tick).persist()) + .join(unsafe { + // SAFETY: edges can be inserted on any tick because we are fixpointing + edges.timestamped(&reachability_tick).tick_batch().persist() + }) .map(q!(|(_from, (_, to))| to)); set_reached_cycle.complete_next_tick(reached.clone().union(reachable)); diff --git a/hydroflow_plus_test_local/src/local/negation.rs b/hydroflow_plus_test_local/src/local/negation.rs index a0924f70a93..2999cfdf8ca 100644 --- a/hydroflow_plus_test_local/src/local/negation.rs +++ b/hydroflow_plus_test_local/src/local/negation.rs @@ -12,12 +12,24 @@ pub fn test_difference<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let mut source = process.source_iter(q!(0..5)).tick_batch(&tick); + let mut source = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(0..5)) + .timestamped(&tick) + .tick_batch() + }; if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); + let mut source2 = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(3..6)) + .timestamped(&tick) + .tick_batch() + }; if persist2 { source2 = source2.persist(); } @@ -39,15 +51,25 @@ pub fn test_anti_join<'a>( let process = flow.process::<()>(); let tick = process.tick(); - let mut source = process - .source_iter(q!(0..5)) - .map(q!(|v| (v, v))) - .tick_batch(&tick); + let mut source = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(0..5)) + .map(q!(|v| (v, v))) + .timestamped(&tick) + .tick_batch() + }; if persist1 { source = source.persist(); } - let mut source2 = process.source_iter(q!(3..6)).tick_batch(&tick); + let mut source2 = unsafe { + // SAFETY: intentionally using ticks + process + .source_iter(q!(3..6)) + .timestamped(&tick) + .tick_batch() + }; if persist2 { source2 = source2.persist(); } diff --git a/hydroflow_plus_test_local/src/local/teed_join.rs b/hydroflow_plus_test_local/src/local/teed_join.rs index 36e575f4109..96ab6789243 100644 --- a/hydroflow_plus_test_local/src/local/teed_join.rs +++ b/hydroflow_plus_test_local/src/local/teed_join.rs @@ -19,7 +19,13 @@ pub fn teed_join<'a, S: Stream + Unpin + 'a>( let node_one = flow.process::(); let n0_tick = node_zero.tick(); - let source = node_zero.source_stream(input_stream).tick_batch(&n0_tick); + let source = unsafe { + // SAFETY: intentionally using ticks + node_zero + .source_stream(input_stream) + .timestamped(&n0_tick) + .tick_batch() + }; let map1 = source.clone().map(q!(|v| (v + 1, ()))); let map2 = source.map(q!(|v| (v - 1, ()))); diff --git a/stageleft/src/lib.rs b/stageleft/src/lib.rs index d3802633527..ee1cdabaad1 100644 --- a/stageleft/src/lib.rs +++ b/stageleft/src/lib.rs @@ -469,7 +469,6 @@ impl<'a, T: 'a, Ctx> QuotedWithContext<'a, T, Ctx> for RuntimeData {} impl Copy for RuntimeData {} -// TODO(shadaj): relax this to allow for non-copy types impl Clone for RuntimeData { fn clone(&self) -> Self { *self diff --git a/template/hydroflow_plus/Cargo.toml b/template/hydroflow_plus/Cargo.toml index 2c521b6155d..1544bce2112 100644 --- a/template/hydroflow_plus/Cargo.toml +++ b/template/hydroflow_plus/Cargo.toml @@ -23,3 +23,6 @@ hydroflow_plus = { git = "{{ hydroflow_git | default: 'https://github.com/hydro- "deploy", ] } tokio-stream = { version = "0.1.3", default-features = false } + +[lints.rust] +unsafe_op_in_unsafe_fn = "warn"