diff --git a/hydroflow_plus/src/location/external_process.rs b/hydroflow_plus/src/location/external_process.rs index de347154722b..0735083fd328 100644 --- a/hydroflow_plus/src/location/external_process.rs +++ b/hydroflow_plus/src/location/external_process.rs @@ -62,7 +62,7 @@ impl<'a, P> ExternalProcess<'a, P> { pub fn source_external_bytes + NoTick>( &self, to: &L, - ) -> (ExternalBytesPort, Stream) { + ) -> (ExternalBytesPort, Stream) { let next_external_port_id = { let mut flow_state = self.flow_state.borrow_mut(); let id = flow_state.next_external_out; @@ -97,7 +97,7 @@ impl<'a, P> ExternalProcess<'a, P> { pub fn source_external_bincode + NoTick, T: Serialize + DeserializeOwned>( &self, to: &L, - ) -> (ExternalBincodeSink, Stream) { + ) -> (ExternalBincodeSink, Stream) { let next_external_port_id = { let mut flow_state = self.flow_state.borrow_mut(); let id = flow_state.next_external_out; diff --git a/hydroflow_plus/src/location/mod.rs b/hydroflow_plus/src/location/mod.rs index d8dd16750e8d..8908f5827647 100644 --- a/hydroflow_plus/src/location/mod.rs +++ b/hydroflow_plus/src/location/mod.rs @@ -78,7 +78,7 @@ pub trait Location<'a>: Clone { } } - fn spin(&self) -> Stream<(), Unbounded, Self> + fn spin(&self) -> Stream<(), Self, Unbounded> where Self: Sized + NoTick, { @@ -94,7 +94,7 @@ pub trait Location<'a>: Clone { fn source_stream + Unpin>( &self, e: impl Quoted<'a, E>, - ) -> Stream + ) -> Stream where Self: Sized + NoTick, { @@ -112,7 +112,7 @@ pub trait Location<'a>: Clone { fn source_iter>( &self, e: impl Quoted<'a, E>, - ) -> Stream + ) -> Stream where Self: Sized + NoTick, { @@ -129,7 +129,7 @@ pub trait Location<'a>: Clone { ) } - fn singleton(&self, e: impl Quoted<'a, T>) -> Singleton + fn singleton(&self, e: impl Quoted<'a, T>) -> Singleton where Self: Sized + NoTick, { @@ -156,7 +156,7 @@ pub trait Location<'a>: Clone { fn source_interval( &self, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Stream + ) -> Stream where Self: Sized + NoTick, { @@ -169,7 +169,7 @@ pub trait Location<'a>: Clone { &self, delay: impl Quoted<'a, Duration> + Copy + 'a, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Stream + ) -> Stream where Self: Sized + NoTick, { diff --git a/hydroflow_plus/src/location/tick.rs b/hydroflow_plus/src/location/tick.rs index 895ae45559ad..d974eb14acca 100644 --- a/hydroflow_plus/src/location/tick.rs +++ b/hydroflow_plus/src/location/tick.rs @@ -45,7 +45,7 @@ impl<'a, L: Location<'a>> Tick { pub fn spin_batch( &self, batch_size: impl Quoted<'a, usize> + Copy + 'a, - ) -> Stream<(), Bounded, Self> + ) -> Stream<(), Self, Bounded> where L: NoTick, { @@ -56,7 +56,7 @@ impl<'a, L: Location<'a>> Tick { .tick_batch(self) } - pub fn singleton(&self, e: impl Quoted<'a, T>) -> Singleton + pub fn singleton(&self, e: impl Quoted<'a, T>) -> Singleton where L: NoTick, { @@ -66,7 +66,7 @@ impl<'a, L: Location<'a>> Tick { pub fn singleton_first_tick( &self, e: impl Quoted<'a, T>, - ) -> Optional + ) -> Optional where L: NoTick, { diff --git a/hydroflow_plus/src/optional.rs b/hydroflow_plus/src/optional.rs index 7ca84bf6be1f..1494f8a667f7 100644 --- a/hydroflow_plus/src/optional.rs +++ b/hydroflow_plus/src/optional.rs @@ -12,15 +12,15 @@ use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource, TeeNode}; use crate::location::{check_matching_location, LocationId, NoTick}; use crate::{Bounded, Location, Singleton, Stream, Tick, Unbounded}; -pub struct Optional { - pub(crate) location: N, +pub struct Optional { + pub(crate) location: L, pub(crate) ir_node: RefCell, - _phantom: PhantomData<(T, N, W)>, + _phantom: PhantomData<(T, L, B)>, } -impl<'a, T, W, N: Location<'a>> Optional { - pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self { +impl<'a, T, L: Location<'a>, B> Optional { + pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self { Optional { location, ir_node: RefCell::new(ir_node), @@ -28,7 +28,7 @@ impl<'a, T, W, N: Location<'a>> Optional { } } - pub fn some(singleton: Singleton) -> Self { + pub fn some(singleton: Singleton) -> Self { Optional::new(singleton.location, singleton.ir_node.into_inner()) } @@ -37,16 +37,16 @@ impl<'a, T, W, N: Location<'a>> Optional { } } -impl<'a, T, N: Location<'a>> DeferTick for Optional> { +impl<'a, T, L: Location<'a>> DeferTick for Optional, Bounded> { fn defer_tick(self) -> Self { Optional::defer_tick(self) } } -impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional> { - type Location = Tick; +impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycle> for Optional, Bounded> { + type Location = Tick; - fn create_source(ident: syn::Ident, location: Tick) -> Self { + fn create_source(ident: syn::Ident, location: Tick) -> Self { let location_id = location.id(); Optional::new( location, @@ -58,7 +58,7 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Optional> CycleComplete<'a, TickCycle> for Optional> { +impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycle> for Optional, Bounded> { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -74,10 +74,10 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Optional> CycleCollection<'a, ForwardRef> for Optional> { - type Location = Tick; +impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRef> for Optional, Bounded> { + type Location = Tick; - fn create_source(ident: syn::Ident, location: Tick) -> Self { + fn create_source(ident: syn::Ident, location: Tick) -> Self { let location_id = location.id(); Optional::new( location, @@ -89,7 +89,7 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Optional> CycleComplete<'a, ForwardRef> for Optional> { +impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRef> for Optional, Bounded> { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -105,10 +105,10 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Optional + NoTick> CycleCollection<'a, ForwardRef> for Optional { - type Location = N; +impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRef> for Optional { + type Location = L; - fn create_source(ident: syn::Ident, location: N) -> Self { + fn create_source(ident: syn::Ident, location: L) -> Self { let location_id = location.id(); Optional::new( location, @@ -120,7 +120,7 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Opt } } -impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Optional { +impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRef> for Optional { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -136,13 +136,19 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Optio } } -impl<'a, T, W, N: Location<'a>> From> for Optional { - fn from(singleton: Singleton) -> Self { +impl<'a, T, L: Location<'a>> From> for Optional { + fn from(singleton: Optional) -> Self { + Optional::new(singleton.location, singleton.ir_node.into_inner()) + } +} + +impl<'a, T, L: Location<'a>, B> From> for Optional { + fn from(singleton: Singleton) -> Self { Optional::some(singleton) } } -impl<'a, T: Clone, W, N: Location<'a>> Clone for Optional { +impl<'a, T: Clone, L: Location<'a>, B> Clone for Optional { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); @@ -166,17 +172,17 @@ impl<'a, T: Clone, W, N: Location<'a>> Clone for Optional { } } -impl<'a, T, W, N: Location<'a>> Optional { +impl<'a, T, L: Location<'a>, B> Optional { // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { - if N::is_top_level() { + pub fn into_stream(self) -> Stream { + if L::is_top_level() { panic!("Converting an optional to a stream is not yet supported at the top level"); } Stream::new(self.location, self.ir_node.into_inner()) } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { + pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { Optional::new( self.location, HfPlusNode::Map { @@ -189,7 +195,7 @@ impl<'a, T, W, N: Location<'a>> Optional { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FlatMap { @@ -199,7 +205,7 @@ impl<'a, T, W, N: Location<'a>> Optional { ) } - pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { + pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { Optional::new( self.location, HfPlusNode::Filter { @@ -212,7 +218,7 @@ impl<'a, T, W, N: Location<'a>> Optional { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional { + ) -> Optional { Optional::new( self.location, HfPlusNode::FilterMap { @@ -222,10 +228,10 @@ impl<'a, T, W, N: Location<'a>> Optional { ) } - pub fn union(self, other: Optional) -> Optional { + pub fn union(self, other: Optional) -> Optional { check_matching_location(&self.location, &other.location); - if N::is_top_level() { + if L::is_top_level() { Optional::new( self.location, HfPlusNode::Persist(Box::new(HfPlusNode::Union( @@ -244,14 +250,14 @@ impl<'a, T, W, N: Location<'a>> Optional { } } - pub fn zip(self, other: impl Into>) -> Optional<(T, O), W, N> + pub fn zip(self, other: impl Into>) -> Optional<(T, O), L, B> where O: Clone, { - let other: Optional = other.into(); + let other: Optional = other.into(); check_matching_location(&self.location, &other.location); - if N::is_top_level() { + if L::is_top_level() { Optional::new( self.location, HfPlusNode::Persist(Box::new(HfPlusNode::CrossSingleton( @@ -270,10 +276,10 @@ impl<'a, T, W, N: Location<'a>> Optional { } } - pub fn unwrap_or(self, other: Singleton) -> Singleton { + pub fn unwrap_or(self, other: Singleton) -> Singleton { check_matching_location(&self.location, &other.location); - if N::is_top_level() { + if L::is_top_level() { Singleton::new( self.location, HfPlusNode::Persist(Box::new(HfPlusNode::Union( @@ -292,7 +298,7 @@ impl<'a, T, W, N: Location<'a>> Optional { } } - pub fn into_singleton(self) -> Singleton, W, N> + pub fn into_singleton(self) -> Singleton, L, B> where T: Clone, { @@ -302,7 +308,7 @@ impl<'a, T, W, N: Location<'a>> Optional { location_kind: self.location.id().root().clone(), })); - let none_singleton = if N::is_top_level() { + let none_singleton = if L::is_top_level() { Singleton::new( self.location.clone(), HfPlusNode::Persist(Box::new(core_ir)), @@ -315,29 +321,29 @@ impl<'a, T, W, N: Location<'a>> Optional { } } -impl<'a, T, N: Location<'a>> Optional { - pub fn continue_if(self, signal: Optional) -> Optional { +impl<'a, T, L: Location<'a>> Optional { + pub fn continue_if(self, signal: Optional) -> Optional { self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) } - pub fn continue_unless(self, other: Optional) -> Optional { + pub fn continue_unless(self, other: Optional) -> Optional { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } - pub fn then(self, value: Singleton) -> Optional { + pub fn then(self, value: Singleton) -> Optional { value.continue_if(self) } } -impl<'a, T, B, N: Location<'a> + NoTick> Optional { - pub fn latest_tick(self, tick: &Tick) -> Optional> { +impl<'a, T, L: Location<'a> + NoTick, B> Optional { + pub fn latest_tick(self, tick: &Tick) -> Optional, Bounded> { Optional::new( tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn tick_samples(self) -> Stream { let tick = self.location.tick(); self.latest_tick(&tick).all_ticks() } @@ -345,7 +351,7 @@ impl<'a, T, B, N: Location<'a> + NoTick> Optional { pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream { + ) -> Stream { let samples = self.location.source_interval(interval); let tick = self.location.tick(); @@ -355,36 +361,36 @@ impl<'a, T, B, N: Location<'a> + NoTick> Optional { } } -impl<'a, T, N: Location<'a>> Optional> { - pub fn all_ticks(self) -> Stream { +impl<'a, T, L: Location<'a>> Optional, Bounded> { + pub fn all_ticks(self) -> Stream { Stream::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Optional { + pub fn latest(self) -> Optional { Optional::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn defer_tick(self) -> Optional> { + pub fn defer_tick(self) -> Optional, Bounded> { Optional::new( self.location, HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), ) } - pub fn persist(self) -> Stream> { + pub fn persist(self) -> Stream, Bounded> { Stream::new( self.location, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn delta(self) -> Optional> { + pub fn delta(self) -> Optional, Bounded> { Optional::new( self.location, HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index b153a0947152..394aae8c0fa2 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -14,15 +14,15 @@ use crate::location::{check_matching_location, Location, LocationId, NoTick, Tic use crate::stream::{Bounded, Unbounded}; use crate::{Optional, Stream}; -pub struct Singleton { - pub(crate) location: N, +pub struct Singleton { + pub(crate) location: L, pub(crate) ir_node: RefCell, - _phantom: PhantomData<(T, N, W)>, + _phantom: PhantomData<(T, L, B)>, } -impl<'a, T, W, N: Location<'a>> Singleton { - pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self { +impl<'a, T, L: Location<'a>, B> Singleton { + pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self { Singleton { location, ir_node: RefCell::new(ir_node), @@ -35,24 +35,24 @@ impl<'a, T, W, N: Location<'a>> Singleton { } } -impl<'a, T, N: Location<'a>> From> for Singleton { - fn from(singleton: Singleton) -> Self { +impl<'a, T, L: Location<'a>> From> for Singleton { + fn from(singleton: Singleton) -> Self { Singleton::new(singleton.location, singleton.ir_node.into_inner()) } } -impl<'a, T, N: Location<'a>> DeferTick for Singleton> { +impl<'a, T, L: Location<'a>> DeferTick for Singleton, Bounded> { fn defer_tick(self) -> Self { Singleton::defer_tick(self) } } -impl<'a, T, N: Location<'a>> CycleCollectionWithInitial<'a, TickCycle> - for Singleton> +impl<'a, T, L: Location<'a>> CycleCollectionWithInitial<'a, TickCycle> + for Singleton, Bounded> { - type Location = Tick; + type Location = Tick; - fn create_source(ident: syn::Ident, initial: Self, location: Tick) -> Self { + fn create_source(ident: syn::Ident, initial: Self, location: Tick) -> Self { let location_id = location.id(); Singleton::new( location, @@ -67,7 +67,7 @@ impl<'a, T, N: Location<'a>> CycleCollectionWithInitial<'a, TickCycle> } } -impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Singleton> { +impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycle> for Singleton, Bounded> { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -83,10 +83,10 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Singleton> CycleCollection<'a, ForwardRef> for Singleton> { - type Location = Tick; +impl<'a, T, L: Location<'a>> CycleCollection<'a, ForwardRef> for Singleton, Bounded> { + type Location = Tick; - fn create_source(ident: syn::Ident, location: Tick) -> Self { + fn create_source(ident: syn::Ident, location: Tick) -> Self { let location_id = location.id(); Singleton::new( location, @@ -98,7 +98,7 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, ForwardRef> for Singleton> CycleComplete<'a, ForwardRef> for Singleton> { +impl<'a, T, L: Location<'a>> CycleComplete<'a, ForwardRef> for Singleton, Bounded> { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -114,7 +114,7 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, ForwardRef> for Singleton> Clone for Singleton { +impl<'a, T: Clone, L: Location<'a>, B> Clone for Singleton { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); @@ -138,13 +138,13 @@ impl<'a, T: Clone, W, N: Location<'a>> Clone for Singleton { } } -impl<'a, T, W, N: Location<'a>> Singleton { +impl<'a, T, L: Location<'a>, B> Singleton { // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream { + pub fn into_stream(self) -> Stream { Stream::new(self.location, self.ir_node.into_inner()) } - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Singleton { + pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Singleton { Singleton::new( self.location, HfPlusNode::Map { @@ -157,7 +157,7 @@ impl<'a, T, W, N: Location<'a>> Singleton { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FlatMap { @@ -167,7 +167,7 @@ impl<'a, T, W, N: Location<'a>> Singleton { ) } - pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { + pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Optional { Optional::new( self.location, HfPlusNode::Filter { @@ -180,7 +180,7 @@ impl<'a, T, W, N: Location<'a>> Singleton { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional { + ) -> Optional { Optional::new( self.location, HfPlusNode::FilterMap { @@ -192,11 +192,11 @@ impl<'a, T, W, N: Location<'a>> Singleton { pub fn zip(self, other: Other) -> >::Out where - Self: ZipResult<'a, Other, Location = N>, + Self: ZipResult<'a, Other, Location = L>, { check_matching_location(&self.location, &Self::other_location(&other)); - if N::is_top_level() { + if L::is_top_level() { Self::make( self.location, HfPlusNode::Persist(Box::new(HfPlusNode::CrossSingleton( @@ -216,25 +216,25 @@ impl<'a, T, W, N: Location<'a>> Singleton { } } -impl<'a, T, N: Location<'a>> Singleton { - pub fn continue_if(self, signal: Optional) -> Optional { +impl<'a, T, L: Location<'a>> Singleton { + pub fn continue_if(self, signal: Optional) -> Optional { self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d)) } - pub fn continue_unless(self, other: Optional) -> Optional { + pub fn continue_unless(self, other: Optional) -> Optional { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } } -impl<'a, T, B, N: Location<'a> + NoTick> Singleton { - pub fn latest_tick(self, tick: &Tick) -> Singleton> { +impl<'a, T, L: Location<'a> + NoTick, B> Singleton { + pub fn latest_tick(self, tick: &Tick) -> Singleton, Bounded> { Singleton::new( tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_samples(self) -> Stream { + pub fn tick_samples(self) -> Stream { let tick = self.location.tick(); self.latest_tick(&tick).all_ticks() } @@ -242,7 +242,7 @@ impl<'a, T, B, N: Location<'a> + NoTick> Singleton { pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream { + ) -> Stream { let samples = self.location.source_interval(interval); let tick = self.location.tick(); @@ -252,36 +252,36 @@ impl<'a, T, B, N: Location<'a> + NoTick> Singleton { } } -impl<'a, T, N: Location<'a>> Singleton> { - pub fn all_ticks(self) -> Stream { +impl<'a, T, L: Location<'a>> Singleton, Bounded> { + pub fn all_ticks(self) -> Stream { Stream::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn latest(self) -> Singleton { + pub fn latest(self) -> Singleton { Singleton::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn defer_tick(self) -> Singleton> { + pub fn defer_tick(self) -> Singleton, Bounded> { Singleton::new( self.location, HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), ) } - pub fn persist(self) -> Stream> { + pub fn persist(self) -> Stream, Bounded> { Stream::new( self.location, HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn delta(self) -> Optional> { + pub fn delta(self) -> Optional, Bounded> { Optional::new( self.location, HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), @@ -299,36 +299,36 @@ pub trait ZipResult<'a, Other> { fn make(location: Self::Location, ir_node: HfPlusNode) -> Self::Out; } -impl<'a, T, U: Clone, W, N: Location<'a>> ZipResult<'a, Singleton> for Singleton { - type Out = Singleton<(T, U), W, N>; - type Location = N; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Singleton> for Singleton { + type Out = Singleton<(T, U), L, B>; + type Location = L; - fn other_location(other: &Singleton) -> N { + fn other_location(other: &Singleton) -> L { other.location.clone() } - fn other_ir_node(other: Singleton) -> HfPlusNode { + fn other_ir_node(other: Singleton) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: N, ir_node: HfPlusNode) -> Self::Out { + fn make(location: L, ir_node: HfPlusNode) -> Self::Out { Singleton::new(location, ir_node) } } -impl<'a, T, U: Clone, W, N: Location<'a>> ZipResult<'a, Optional> for Singleton { - type Out = Optional<(T, U), W, N>; - type Location = N; +impl<'a, T, U: Clone, L: Location<'a>, B> ZipResult<'a, Optional> for Singleton { + type Out = Optional<(T, U), L, B>; + type Location = L; - fn other_location(other: &Optional) -> N { + fn other_location(other: &Optional) -> L { other.location.clone() } - fn other_ir_node(other: Optional) -> HfPlusNode { + fn other_ir_node(other: Optional) -> HfPlusNode { other.ir_node.into_inner() } - fn make(location: N, ir_node: HfPlusNode) -> Self::Out { + fn make(location: L, ir_node: HfPlusNode) -> Self::Out { Optional::new(location, ir_node) } } diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 3f1e1803cc94..f85eb32aa2dc 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -31,36 +31,36 @@ pub enum Unbounded {} /// to be complete in finite time. pub enum Bounded {} -/// An infinite stream of elements of type `T`. +/// An ordered sequence stream of elements of type `T`. /// /// Type Parameters: /// - `T`: the type of elements in the stream +/// - `L`: the location where the stream is being materialized /// - `B`: the boundedness of the stream, which is either [`Bounded`] /// or [`Unbounded`] -/// - `N`: the type of the node that the stream is materialized on -pub struct Stream { - location: N, +pub struct Stream { + location: L, pub(crate) ir_node: RefCell, - _phantom: PhantomData<(T, B, N)>, + _phantom: PhantomData<(T, L, B)>, } -impl<'a, T, W, N: Location<'a>> Stream { +impl<'a, T, L: Location<'a>, B> Stream { fn location_kind(&self) -> LocationId { self.location.id() } } -impl<'a, T, N: Location<'a>> DeferTick for Stream> { +impl<'a, T, L: Location<'a>> DeferTick for Stream, Bounded> { fn defer_tick(self) -> Self { Stream::defer_tick(self) } } -impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Stream> { - type Location = Tick; +impl<'a, T, L: Location<'a>> CycleCollection<'a, TickCycle> for Stream, Bounded> { + type Location = Tick; - fn create_source(ident: syn::Ident, location: Tick) -> Self { + fn create_source(ident: syn::Ident, location: Tick) -> Self { let location_id = location.id(); Stream::new( location, @@ -72,7 +72,7 @@ impl<'a, T, N: Location<'a>> CycleCollection<'a, TickCycle> for Stream> CycleComplete<'a, TickCycle> for Stream> { +impl<'a, T, L: Location<'a>> CycleComplete<'a, TickCycle> for Stream, Bounded> { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -88,10 +88,10 @@ impl<'a, T, N: Location<'a>> CycleComplete<'a, TickCycle> for Stream + NoTick> CycleCollection<'a, ForwardRef> for Stream { - type Location = N; +impl<'a, T, L: Location<'a> + NoTick, B> CycleCollection<'a, ForwardRef> for Stream { + type Location = L; - fn create_source(ident: syn::Ident, location: N) -> Self { + fn create_source(ident: syn::Ident, location: L) -> Self { let location_id = location.id(); Stream::new( location, @@ -103,7 +103,7 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleCollection<'a, ForwardRef> for Str } } -impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Stream { +impl<'a, T, L: Location<'a> + NoTick, B> CycleComplete<'a, ForwardRef> for Stream { fn complete(self, ident: syn::Ident) { self.location .flow_state() @@ -119,8 +119,8 @@ impl<'a, T, W, N: Location<'a> + NoTick> CycleComplete<'a, ForwardRef> for Strea } } -impl<'a, T, W, N: Location<'a>> Stream { - pub(crate) fn new(location: N, ir_node: HfPlusNode) -> Self { +impl<'a, T, L: Location<'a>, B> Stream { + pub(crate) fn new(location: L, ir_node: HfPlusNode) -> Self { Stream { location, ir_node: RefCell::new(ir_node), @@ -129,7 +129,7 @@ impl<'a, T, W, N: Location<'a>> Stream { } } -impl<'a, T: Clone, W, N: Location<'a>> Clone for Stream { +impl<'a, T: Clone, L: Location<'a>, B> Clone for Stream { fn clone(&self) -> Self { if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); @@ -153,8 +153,8 @@ impl<'a, T: Clone, W, N: Location<'a>> Clone for Stream { } } -impl<'a, T, W, N: Location<'a>> Stream { - pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { +impl<'a, T, L: Location<'a>, B> Stream { + pub fn map U + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { Stream::new( self.location, HfPlusNode::Map { @@ -164,7 +164,7 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn cloned(self) -> Stream + pub fn cloned(self) -> Stream where T: Clone, { @@ -174,7 +174,7 @@ impl<'a, T, W, N: Location<'a>> Stream { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FlatMap { @@ -184,14 +184,14 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn flatten(self) -> Stream + pub fn flatten(self) -> Stream where T: IntoIterator, { self.flat_map(q!(|d| d)) } - pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { + pub fn filter bool + 'a>(self, f: impl IntoQuotedMut<'a, F>) -> Stream { Stream::new( self.location, HfPlusNode::Filter { @@ -204,7 +204,7 @@ impl<'a, T, W, N: Location<'a>> Stream { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream { + ) -> Stream { Stream::new( self.location, HfPlusNode::FilterMap { @@ -216,12 +216,12 @@ impl<'a, T, W, N: Location<'a>> Stream { pub fn cross_singleton( self, - other: impl Into>, - ) -> Stream<(T, O), W, N> + other: impl Into>, + ) -> Stream<(T, O), L, B> where O: Clone, { - let other: Optional = other.into(); + let other: Optional = other.into(); check_matching_location(&self.location, &other.location); Stream::new( @@ -234,17 +234,17 @@ impl<'a, T, W, N: Location<'a>> Stream { } /// Allow this stream through if the other stream has elements, otherwise the output is empty. - pub fn continue_if(self, signal: Optional) -> Stream { + pub fn continue_if(self, signal: Optional) -> Stream { self.cross_singleton(signal.map(q!(|_u| ()))) .map(q!(|(d, _signal)| d)) } /// Allow this stream through if the other stream is empty, otherwise the output is empty. - pub fn continue_unless(self, other: Optional) -> Stream { + pub fn continue_unless(self, other: Optional) -> Stream { self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } - pub fn cross_product(self, other: Stream) -> Stream<(T, O), W, N> + pub fn cross_product(self, other: Stream) -> Stream<(T, O), L, B> where T: Clone, O: Clone, @@ -260,7 +260,7 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn union(self, other: Stream) -> Stream { + pub fn union(self, other: Stream) -> Stream { check_matching_location(&self.location, &other.location); Stream::new( @@ -272,14 +272,14 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn enumerate(self) -> Stream<(usize, T), W, N> { + pub fn enumerate(self) -> Stream<(usize, T), L, B> { Stream::new( self.location, HfPlusNode::Enumerate(Box::new(self.ir_node.into_inner())), ) } - pub fn unique(self) -> Stream + pub fn unique(self) -> Stream where T: Eq + Hash, { @@ -289,7 +289,7 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn filter_not_in(self, other: Stream) -> Stream + pub fn filter_not_in(self, other: Stream) -> Stream where T: Eq + Hash, { @@ -304,12 +304,12 @@ impl<'a, T, W, N: Location<'a>> Stream { ) } - pub fn first(self) -> Optional { + pub fn first(self) -> Optional { Optional::new(self.location, self.ir_node.into_inner()) } - pub fn inspect(self, f: impl IntoQuotedMut<'a, F>) -> Stream { - if N::is_top_level() { + pub fn inspect(self, f: impl IntoQuotedMut<'a, F>) -> Stream { + if L::is_top_level() { Stream::new( self.location, HfPlusNode::Persist(Box::new(HfPlusNode::Inspect { @@ -332,14 +332,14 @@ impl<'a, T, W, N: Location<'a>> Stream { self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, F>, - ) -> Singleton { + ) -> Singleton { let mut core = HfPlusNode::Fold { init: init.splice_fn0().into(), acc: comb.splice_fn2_borrow_mut().into(), input: Box::new(self.ir_node.into_inner()), }; - if N::is_top_level() { + if L::is_top_level() { // top-level (possibly unbounded) singletons are represented as // a stream which produces all values from all ticks every tick, // so Unpersist will always give the lastest aggregation @@ -352,20 +352,20 @@ impl<'a, T, W, N: Location<'a>> Stream { pub fn reduce( self, comb: impl IntoQuotedMut<'a, F>, - ) -> Optional { + ) -> Optional { let mut core = HfPlusNode::Reduce { f: comb.splice_fn2_borrow_mut().into(), input: Box::new(self.ir_node.into_inner()), }; - if N::is_top_level() { + if L::is_top_level() { core = HfPlusNode::Persist(Box::new(core)); } Optional::new(self.location, core) } - pub fn max(self) -> Optional + pub fn max(self) -> Optional where T: Ord, { @@ -376,7 +376,7 @@ impl<'a, T, W, N: Location<'a>> Stream { })) } - pub fn min(self) -> Optional + pub fn min(self) -> Optional where T: Ord, { @@ -387,13 +387,13 @@ impl<'a, T, W, N: Location<'a>> Stream { })) } - pub fn count(self) -> Singleton { + pub fn count(self) -> Singleton { self.fold(q!(|| 0usize), q!(|count, _| *count += 1)) } } -impl<'a, T, N: Location<'a>> Stream { - pub fn sort(self) -> Stream +impl<'a, T, L: Location<'a>> Stream { + pub fn sort(self) -> Stream where T: Ord, { @@ -404,8 +404,8 @@ impl<'a, T, N: Location<'a>> Stream { } } -impl<'a, K, V1, W, N: Location<'a>> Stream<(K, V1), W, N> { - pub fn join(self, n: Stream<(K, V2), W, N>) -> Stream<(K, (V1, V2)), W, N> +impl<'a, K, V1, L: Location<'a>, B> Stream<(K, V1), L, B> { + pub fn join(self, n: Stream<(K, V2), L, B>) -> Stream<(K, (V1, V2)), L, B> where K: Eq + Hash, { @@ -420,7 +420,7 @@ impl<'a, K, V1, W, N: Location<'a>> Stream<(K, V1), W, N> { ) } - pub fn anti_join(self, n: Stream) -> Stream<(K, V1), W, N> + pub fn anti_join(self, n: Stream) -> Stream<(K, V1), L, B> where K: Eq + Hash, { @@ -436,12 +436,12 @@ impl<'a, K, V1, W, N: Location<'a>> Stream<(K, V1), W, N> { } } -impl<'a, K: Eq + Hash, V, N: Location<'a>> Stream<(K, V), Bounded, Tick> { +impl<'a, K: Eq + Hash, V, L: Location<'a>> Stream<(K, V), Tick, Bounded> { pub fn fold_keyed A + 'a, F: Fn(&mut A, V) + 'a>( self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, F>, - ) -> Stream<(K, A), Bounded, Tick> { + ) -> Stream<(K, A), Tick, Bounded> { Stream::new( self.location, HfPlusNode::FoldKeyed { @@ -455,7 +455,7 @@ impl<'a, K: Eq + Hash, V, N: Location<'a>> Stream<(K, V), Bounded, Tick> { pub fn reduce_keyed( self, comb: impl IntoQuotedMut<'a, F>, - ) -> Stream<(K, V), Bounded, Tick> { + ) -> Stream<(K, V), Tick, Bounded> { Stream::new( self.location, HfPlusNode::ReduceKeyed { @@ -466,15 +466,15 @@ impl<'a, K: Eq + Hash, V, N: Location<'a>> Stream<(K, V), Bounded, Tick> { } } -impl<'a, T, W, N: Location<'a> + NoTick> Stream { - pub fn tick_batch(self, tick: &Tick) -> Stream> { +impl<'a, T, L: Location<'a> + NoTick, B> Stream { + pub fn tick_batch(self, tick: &Tick) -> Stream, Bounded> { Stream::new( tick.clone(), HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), ) } - pub fn tick_prefix(self, tick: &Tick) -> Stream> + pub fn tick_prefix(self, tick: &Tick) -> Stream, Bounded> where T: Clone, { @@ -484,7 +484,7 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { pub fn sample_every( self, interval: impl Quoted<'a, std::time::Duration> + Copy + 'a, - ) -> Stream { + ) -> Stream { let samples = self.location.source_interval(interval); let tick = self.location.tick(); self.tick_batch(&tick) @@ -519,15 +519,15 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { } } -impl<'a, T, N: Location<'a>> Stream> { - pub fn all_ticks(self) -> Stream { +impl<'a, T, L: Location<'a>> Stream, Bounded> { + pub fn all_ticks(self) -> Stream { Stream::new( self.location.outer().clone(), HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), ) } - pub fn persist(self) -> Stream> + pub fn persist(self) -> Stream, Bounded> where T: Clone, { @@ -537,14 +537,14 @@ impl<'a, T, N: Location<'a>> Stream> { ) } - pub fn defer_tick(self) -> Stream> { + pub fn defer_tick(self) -> Stream, Bounded> { Stream::new( self.location, HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), ) } - pub fn delta(self) -> Stream> { + pub fn delta(self) -> Stream, Bounded> { Stream::new( self.location, HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), @@ -593,13 +593,13 @@ pub(super) fn deserialize_bincode(tagged: Option } } -impl<'a, T, W, N: Location<'a> + NoTick> Stream { +impl<'a, T, L: Location<'a> + NoTick, B> Stream { pub fn decouple_process( self, other: &Process<'a, P2>, - ) -> Stream> + ) -> Stream, Unbounded> where - N: CanSend<'a, Process<'a, P2>, In = T, Out = T>, + L: CanSend<'a, Process<'a, P2>, In = T, Out = T>, T: Clone + Serialize + DeserializeOwned, { self.send_bincode::, T>(other) @@ -608,9 +608,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { pub fn decouple_cluster( self, other: &Cluster<'a, C2>, - ) -> Stream> + ) -> Stream, Unbounded> where - N: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)>, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)>, T: Clone + Serialize + DeserializeOwned, { let self_node_id = match self.location_kind() { @@ -625,17 +625,17 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { .send_bincode_interleaved(other) } - pub fn send_bincode, CoreType>( + pub fn send_bincode, CoreType>( self, - other: &N2, - ) -> Stream, Unbounded, N2> + other: &L2, + ) -> Stream, L2, Unbounded> where - N: CanSend<'a, N2, In = T>, + L: CanSend<'a, L2, In = T>, CoreType: Serialize + DeserializeOwned, { - let serialize_pipeline = Some(serialize_bincode::(N::is_demux())); + let serialize_pipeline = Some(serialize_bincode::(L::is_demux())); - let deserialize_pipeline = Some(deserialize_bincode::(N::tagged_type())); + let deserialize_pipeline = Some(deserialize_bincode::(L::tagged_type())); Stream::new( other.clone(), @@ -652,16 +652,16 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { ) } - pub fn send_bincode_external( + pub fn send_bincode_external( self, - other: &ExternalProcess, - ) -> ExternalBincodeStream> + other: &ExternalProcess, + ) -> ExternalBincodeStream> where - N: CanSend<'a, ExternalProcess<'a, N2>, In = T, Out = CoreType>, + L: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = CoreType>, CoreType: Serialize + DeserializeOwned, // for now, we restirct Out to be CoreType, which means no tagged cluster -> external { - let serialize_pipeline = Some(serialize_bincode::(N::is_demux())); + let serialize_pipeline = Some(serialize_bincode::(L::is_demux())); let mut flow_state_borrow = self.location.flow_state().borrow_mut(); @@ -693,9 +693,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { } } - pub fn send_bytes>(self, other: &N2) -> Stream, Unbounded, N2> + pub fn send_bytes>(self, other: &L2) -> Stream, L2, Unbounded> where - N: CanSend<'a, N2, In = T>, + L: CanSend<'a, L2, In = T>, { let root = get_this_crate(); Stream::new( @@ -707,7 +707,7 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { to_key: None, serialize_pipeline: None, instantiate_fn: DebugInstantiate::Building(), - deserialize_pipeline: if let Some(c_type) = N::tagged_type() { + deserialize_pipeline: if let Some(c_type) = L::tagged_type() { Some( parse_quote!(map(|(id, b)| (#root::ClusterId<#c_type>::from_raw(id), b.unwrap().freeze()))), ) @@ -719,9 +719,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { ) } - pub fn send_bytes_external(self, other: &ExternalProcess) -> ExternalBytesPort + pub fn send_bytes_external(self, other: &ExternalProcess) -> ExternalBytesPort where - N: CanSend<'a, ExternalProcess<'a, N2>, In = T, Out = Bytes>, + L: CanSend<'a, ExternalProcess<'a, L2>, In = T, Out = Bytes>, { let mut flow_state_borrow = self.location.flow_state().borrow_mut(); let external_key = flow_state_borrow.next_external_out; @@ -751,33 +751,33 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { } } - pub fn send_bincode_interleaved, Tag, CoreType>( + pub fn send_bincode_interleaved, Tag, CoreType>( self, - other: &N2, - ) -> Stream + other: &L2, + ) -> Stream where - N: CanSend<'a, N2, In = T, Out = (Tag, CoreType)>, + L: CanSend<'a, L2, In = T, Out = (Tag, CoreType)>, CoreType: Serialize + DeserializeOwned, { - self.send_bincode::(other).map(q!(|(_, b)| b)) + self.send_bincode::(other).map(q!(|(_, b)| b)) } - pub fn send_bytes_interleaved, Tag>( + pub fn send_bytes_interleaved, Tag>( self, - other: &N2, - ) -> Stream + other: &L2, + ) -> Stream where - N: CanSend<'a, N2, In = T, Out = (Tag, Bytes)>, + L: CanSend<'a, L2, In = T, Out = (Tag, Bytes)>, { - self.send_bytes::(other).map(q!(|(_, b)| b)) + self.send_bytes::(other).map(q!(|(_, b)| b)) } pub fn broadcast_bincode( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded, Cluster<'a, C2>> + ) -> Stream, Cluster<'a, C2>, Unbounded> where - N: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)>, T: Clone + Serialize + DeserializeOwned, { let ids = other.members(); @@ -792,9 +792,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { pub fn broadcast_bincode_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream> + ) -> Stream, Unbounded> where - N: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, T)> + 'a, T: Clone + Serialize + DeserializeOwned, { self.broadcast_bincode(other).map(q!(|(_, b)| b)) @@ -803,9 +803,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { pub fn broadcast_bytes( self, other: &Cluster<'a, C2>, - ) -> Stream, Unbounded, Cluster<'a, C2>> + ) -> Stream, Cluster<'a, C2>, Unbounded> where - N: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T)> + 'a, T: Clone, { let ids = other.members(); @@ -820,9 +820,9 @@ impl<'a, T, W, N: Location<'a> + NoTick> Stream { pub fn broadcast_bytes_interleaved( self, other: &Cluster<'a, C2>, - ) -> Stream> + ) -> Stream, Unbounded> where - N: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + L: CanSend<'a, Cluster<'a, C2>, In = (ClusterId, T), Out = (Tag, Bytes)> + 'a, T: Clone, { diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index a5d14f794063..85b9e4c06940 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -62,15 +62,15 @@ struct P2b

{ pub fn paxos_core<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, - c_to_proposers: Stream>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, + c_to_proposers: Stream, Unbounded>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, ) -> ( - Stream<(), Unbounded, Cluster<'a, Proposer>>, - Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>>, + Stream<(), Cluster<'a, Proposer>, Unbounded>, + Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded>, ) { proposers .source_iter(q!(["Proposers say hello"])) @@ -154,13 +154,13 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, - p_received_p2b_ballots: Stream>, - a_log: Singleton>>, + p_received_p2b_ballots: Stream, Unbounded>, + a_log: Singleton>, Bounded>, ) -> ( - Singleton>>, - Optional>>, - Stream, Bounded, Tick>>, - Singleton>>, + Singleton>, Bounded>, + Optional>, Bounded>, + Stream, Tick>, Bounded>, + Singleton>, Bounded>, ) { let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b_forward_ref) = proposers.forward_ref::, _, _>>(); @@ -223,10 +223,10 @@ fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>( // Proposer logic to calculate the largest ballot received so far. fn p_max_ballot<'a>( proposers: &Cluster<'a, Proposer>, - p_received_p1b_ballots: Stream>, - p_received_p2b_ballots: Stream>, - p_to_proposers_i_am_leader: Stream>, -) -> Singleton> { + p_received_p1b_ballots: Stream, Unbounded>, + p_received_p2b_ballots: Stream, Unbounded>, + p_to_proposers_i_am_leader: Stream, Unbounded>, +) -> Singleton, Unbounded> { p_received_p1b_ballots .union(p_received_p2b_ballots) .union(p_to_proposers_i_am_leader) @@ -242,10 +242,10 @@ fn p_max_ballot<'a>( fn p_ballot_calc<'a>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, - p_received_max_ballot: Singleton>>, + p_received_max_ballot: Singleton>, Bounded>, ) -> ( - Singleton>>, - Optional<(Ballot, u32), Bounded, Tick>>, + Singleton>, Bounded>, + Optional<(Ballot, u32), Tick>, Bounded>, ) { let p_id = proposers.self_id(); let (p_ballot_num_complete_cycle, p_ballot_num) = @@ -285,10 +285,10 @@ fn p_ballot_calc<'a>( fn p_leader_expired<'a>( proposer_tick: &Tick>, - p_to_proposers_i_am_leader: Stream>, - p_is_leader: Optional>>, + p_to_proposers_i_am_leader: Stream, Unbounded>, + p_is_leader: Optional>, Bounded>, i_am_leader_check_timeout: u64, // How often to check if heartbeat expired -) -> Optional, Bounded, Tick>> { +) -> Optional, Tick>, Bounded> { let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( q!(|| None), q!(|latest, _| { @@ -314,14 +314,14 @@ fn p_leader_expired<'a>( fn p_leader_heartbeat<'a>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, - p_is_leader: Optional>>, - p_ballot_num: Singleton>>, + p_is_leader: Optional>, Bounded>, + p_ballot_num: Singleton>, Bounded>, i_am_leader_send_timeout: u64, // How often to heartbeat i_am_leader_check_timeout: u64, // How often to check if heartbeat expired i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( - Stream>, - Optional, Bounded, Tick>>, + Stream, Unbounded>, + Optional, Tick>, Bounded>, ) { let p_id = proposers.self_id(); let p_to_proposers_i_am_leader = p_is_leader @@ -359,11 +359,11 @@ fn p_leader_heartbeat<'a>( // Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. fn p_p1a<'a>( - p_ballot_num: Singleton>>, - p_trigger_election: Optional, Bounded, Tick>>, + p_ballot_num: Singleton>, Bounded>, + p_trigger_election: Optional, Tick>, Bounded>, proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, -) -> Stream> { +) -> Stream, Unbounded> { let p_id = proposers.self_id(); p_trigger_election @@ -382,12 +382,12 @@ fn p_p1a<'a>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( acceptor_tick: &Tick>, - p_to_acceptors_p1a: Stream>, - a_log: Singleton>>, + p_to_acceptors_p1a: Stream, Unbounded>, + a_log: Singleton>, Bounded>, proposers: &Cluster<'a, Proposer>, ) -> ( - Singleton>>, - Stream, Unbounded, Cluster<'a, Proposer>>, + Singleton>, Bounded>, + Stream, Cluster<'a, Proposer>, Unbounded>, ) { let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch(acceptor_tick); let a_max_ballot = p_to_acceptors_p1a @@ -424,13 +424,13 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>( fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, - a_to_proposers_p1b: Stream, Unbounded, Cluster<'a, Proposer>>, - p_ballot_num: Singleton>>, - p_has_largest_ballot: Optional<(Ballot, u32), Bounded, Tick>>, + a_to_proposers_p1b: Stream, Cluster<'a, Proposer>, Unbounded>, + p_ballot_num: Singleton>, Bounded>, + p_has_largest_ballot: Optional<(Ballot, u32), Tick>, Bounded>, f: usize, ) -> ( - Optional>>, - Stream, Bounded, Tick>>, + Optional>, Bounded>, + Stream, Tick>, Bounded>, ) { let p_id = proposers.self_id(); let p_relevant_p1bs = a_to_proposers_p1b @@ -457,13 +457,13 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn recommit_after_leader_election<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - p_relevant_p1bs: Stream>>, Bounded, Tick>>, - p_ballot_num: Singleton>>, + p_relevant_p1bs: Stream>>, Tick>, Bounded>, + p_ballot_num: Singleton>, Bounded>, f: usize, ) -> ( - Stream, Bounded, Tick>>, - Optional>>, - Stream, Bounded, Tick>>, + Stream, Tick>, Bounded>, + Optional>, Bounded>, + Stream, Tick>, Bounded>, ) { let p_id = proposers.self_id(); @@ -541,21 +541,21 @@ fn sequence_payload<'a, P: PaxosPayload, R>( acceptors: &Cluster<'a, Acceptor>, proposer_tick: &Tick>, acceptor_tick: &Tick>, - c_to_proposers: Stream>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, + c_to_proposers: Stream, Unbounded>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, - p_ballot_num: Singleton>>, - p_is_leader: Optional>>, - p_max_slot: Optional>>, + p_ballot_num: Singleton>, Bounded>, + p_is_leader: Optional>, Bounded>, + p_max_slot: Optional>, Bounded>, - p_log_to_recommit: Stream, Bounded, Tick>>, + p_log_to_recommit: Stream, Tick>, Bounded>, f: usize, - a_max_ballot: Singleton>>, + a_max_ballot: Singleton>, Bounded>, ) -> ( - Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>>, - Singleton<(Option, HashMap>), Bounded, Tick>>, - Stream, Unbounded, Cluster<'a, Proposer>>, + Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded>, + Singleton<(Option, HashMap>), Tick>, Bounded>, + Stream, Cluster<'a, Proposer>, Unbounded>, ) { let p_to_acceptors_p2a = p_p2a( proposers, @@ -595,13 +595,13 @@ enum CheckpointOrP2a

{ fn p_p2a<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, proposer_tick: &Tick>, - p_max_slot: Optional>>, - c_to_proposers: Stream>, - p_ballot_num: Singleton>>, - p_log_to_recommit: Stream, Bounded, Tick>>, - p_is_leader: Optional>>, + p_max_slot: Optional>, Bounded>, + c_to_proposers: Stream, Unbounded>, + p_ballot_num: Singleton>, Bounded>, + p_log_to_recommit: Stream, Tick>, Bounded>, + p_is_leader: Optional>, Bounded>, acceptors: &Cluster<'a, Acceptor>, -) -> Stream, Unbounded, Cluster<'a, Acceptor>> { +) -> Stream, Cluster<'a, Acceptor>, Unbounded> { let p_id = proposers.self_id(); let (p_next_slot_complete_cycle, p_next_slot) = proposer_tick.cycle::>(); let p_next_slot_after_reconciling_p1bs = p_max_slot @@ -649,14 +649,14 @@ fn p_p2a<'a, P: PaxosPayload>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn acceptor_p2<'a, P: PaxosPayload, R>( acceptor_tick: &Tick>, - a_max_ballot: Singleton>>, - p_to_acceptors_p2a: Stream, Unbounded, Cluster<'a, Acceptor>>, - r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, + a_max_ballot: Singleton>, Bounded>, + p_to_acceptors_p2a: Stream, Cluster<'a, Acceptor>, Unbounded>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Cluster<'a, Acceptor>, Unbounded>, proposers: &Cluster<'a, Proposer>, f: usize, ) -> ( - Singleton<(Option, HashMap>), Bounded, Tick>>, - Stream, Unbounded, Cluster<'a, Proposer>>, + Singleton<(Option, HashMap>), Tick>, Bounded>, + Stream, Cluster<'a, Proposer>, Unbounded>, ) { let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(acceptor_tick); @@ -747,9 +747,9 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( fn p_p2b<'a, P: PaxosPayload>( proposer_tick: &Tick>, - a_to_proposers_p2b: Stream, Unbounded, Cluster<'a, Proposer>>, + a_to_proposers_p2b: Stream, Cluster<'a, Proposer>, Unbounded>, f: usize, -) -> Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>> { +) -> Stream<(usize, Option

), Cluster<'a, Proposer>, Unbounded> { let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposer_tick.cycle(); let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposer_tick.cycle(); let p_p2b = a_to_proposers_p2b diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index e9c59925b274..e3db1458a809 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -72,17 +72,17 @@ pub fn paxos_bench<'a>( // Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed. fn bench_client<'a>( clients: &Cluster<'a, Client>, - p_to_clients_leader_elected: Stream, Unbounded, Cluster<'a, Client>>, + p_to_clients_leader_elected: Stream, Cluster<'a, Client>, Unbounded>, transaction_cycle: impl FnOnce( Stream< (ClusterId, KvPayload>), - Unbounded, Cluster<'a, Client>, + Unbounded, >, ) -> Stream< (ClusterId, KvPayload>), - Unbounded, Cluster<'a, Client>, + Unbounded, >, num_clients_per_node: usize, median_latency_window_size: usize, diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 840e30cc9af7..9aeef0d9b7c4 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -51,15 +51,15 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, replicas: &Cluster<'a, Replica>, - c_to_proposers: Stream, Unbounded, Cluster<'a, Proposer>>, + c_to_proposers: Stream, Cluster<'a, Proposer>, Unbounded>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, checkpoint_frequency: usize, ) -> ( - Stream<(), Unbounded, Cluster<'a, Proposer>>, - Stream, Unbounded, Cluster<'a, Replica>>, + Stream<(), Cluster<'a, Proposer>, Unbounded>, + Stream, Cluster<'a, Replica>, Unbounded>, ) { let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = replicas.forward_ref::>(); @@ -92,11 +92,11 @@ pub fn paxos_kv<'a, K: KvKey, V: KvValue>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] pub fn replica<'a, K: KvKey, V: KvValue>( replicas: &Cluster<'a, Replica>, - p_to_replicas: Stream, Unbounded, Cluster<'a, Replica>>, + p_to_replicas: Stream, Cluster<'a, Replica>, Unbounded>, checkpoint_frequency: usize, ) -> ( - Stream>, - Stream, Unbounded, Cluster<'a, Replica>>, + Stream, Unbounded>, + Stream, Cluster<'a, Replica>, Unbounded>, ) { let replica_tick = replicas.tick();