diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 88ff13c56142..46b54dd93292 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -12,6 +12,7 @@ use quote::quote; use runtime_support::FreeVariable; use stageleft::*; +use crate::cycle::CycleCollection; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{Cluster, Location, LocationId, Process}; use crate::stream::{Bounded, NoTick, Tick, Unbounded}; @@ -277,50 +278,7 @@ impl<'a> FlowBuilder<'a> { ) } - #[allow(clippy::type_complexity)] - pub fn cycle( - &self, - on: &L, - ) -> (HfCycle<'a, T, W, NoTick, L>, Stream<'a, T, W, NoTick, L>) { - let next_id = { - let on_id = match on.id() { - LocationId::Process(id) => id, - LocationId::Cluster(id) => id, - }; - - let mut cycle_ids = self.cycle_ids.borrow_mut(); - let next_id_entry = cycle_ids.entry(on_id).or_default(); - - let id = *next_id_entry; - *next_id_entry += 1; - id - }; - - let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site()); - - ( - HfCycle { - ident: ident.clone(), - location_kind: on.id(), - ir_leaves: self.ir_leaves().clone(), - _phantom: PhantomData, - }, - Stream::new( - on.id(), - self.ir_leaves().clone(), - HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { - ident, - location_kind: on.id(), - })), - ), - ) - } - - #[allow(clippy::type_complexity)] - pub fn tick_cycle( - &self, - on: &L, - ) -> (HfCycle<'a, T, W, Tick, L>, Stream<'a, T, W, Tick, L>) { + pub fn cycle>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) { let next_id = { let on_id = match on.id() { LocationId::Process(id) => id, @@ -340,18 +298,9 @@ impl<'a> FlowBuilder<'a> { ( HfCycle { ident: ident.clone(), - location_kind: on.id(), - ir_leaves: self.ir_leaves().clone(), _phantom: PhantomData, }, - Stream::new( - on.id(), - self.ir_leaves().clone(), - HfPlusNode::CycleSource { - ident, - location_kind: on.id(), - }, - ), + S::create_source(ident, self.ir_leaves.clone(), on.id()), ) } } diff --git a/hydroflow_plus/src/cycle.rs b/hydroflow_plus/src/cycle.rs index 66a8f68cf827..3076aacb0d82 100644 --- a/hydroflow_plus/src/cycle.rs +++ b/hydroflow_plus/src/cycle.rs @@ -1,44 +1,28 @@ use std::marker::PhantomData; use crate::builder::FlowLeaves; -use crate::ir::{HfPlusLeaf, HfPlusNode}; use crate::location::{Location, LocationId}; -use crate::stream::{NoTick, Tick}; -use crate::Stream; + +pub trait CycleCollection<'a> { + type Location: Location; + + fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self; + + fn complete(self, ident: syn::Ident); +} /// Represents a fixpoint cycle in the graph that will be fulfilled /// by a stream that is not yet known. /// -/// See [`Stream`] for an explainer on the type parameters. -pub struct HfCycle<'a, T, W, C, N: Location> { +/// See [`crate::FlowBuilder`] for an explainer on the type parameters. +pub struct HfCycle<'a, S: CycleCollection<'a>> { pub(crate) ident: syn::Ident, - pub(crate) location_kind: LocationId, - pub(crate) ir_leaves: FlowLeaves<'a>, - pub(crate) _phantom: PhantomData<(N, &'a mut &'a (), T, W, C)>, -} - -impl<'a, T, W, N: Location> HfCycle<'a, T, W, Tick, N> { - pub fn complete(self, stream: Stream<'a, T, W, Tick, N>) { - let ident = self.ident; - - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { - ident, - location_kind: self.location_kind, - input: Box::new(stream.ir_node.into_inner()), - }); - } + pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>, } -impl<'a, T, W, N: Location> HfCycle<'a, T, W, NoTick, N> { - pub fn complete(self, stream: Stream<'a, T, W, NoTick, N>) { +impl<'a, S: CycleCollection<'a>> HfCycle<'a, S> { + pub fn complete(self, stream: S) { let ident = self.ident; - - self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { - ident, - location_kind: self.location_kind, - input: Box::new(HfPlusNode::Unpersist( - Box::new(stream.ir_node.into_inner()) - )), - }); + S::complete(stream, ident) } } diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index a805b333ddde..29299fe79851 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -19,7 +19,10 @@ pub mod runtime_support { } pub mod stream; -pub use stream::Stream; +pub use stream::{Bounded, NoTick, Stream, Tick, Unbounded}; + +pub mod singleton; +pub use singleton::{Optional, Singleton}; pub mod location; pub use location::{Cluster, Process}; diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs new file mode 100644 index 000000000000..1d6109b6196d --- /dev/null +++ b/hydroflow_plus/src/singleton.rs @@ -0,0 +1,462 @@ +use std::cell::RefCell; +use std::marker::PhantomData; +use std::ops::Deref; +use std::rc::Rc; + +use stageleft::{q, IntoQuotedMut, Quoted}; + +use crate::builder::FlowLeaves; +use crate::cycle::CycleCollection; +use crate::ir::{HfPlusLeaf, HfPlusNode}; +use crate::location::{Location, LocationId}; +use crate::stream::{Bounded, NoTick, Tick, Unbounded}; +use crate::Stream; + +pub struct Singleton<'a, T, W, C, N: Location> { + pub(crate) location_kind: LocationId, + + ir_leaves: FlowLeaves<'a>, + pub(crate) ir_node: RefCell>, + + _phantom: PhantomData<(&'a mut &'a (), T, N, W, C)>, +} + +impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> { + pub(crate) fn new( + location_kind: LocationId, + ir_leaves: FlowLeaves<'a>, + ir_node: HfPlusNode<'a>, + ) -> Self { + Singleton { + location_kind, + ir_leaves, + ir_node: RefCell::new(ir_node), + _phantom: PhantomData, + } + } +} + +// this is not safe on the first tick +// impl <'a, T, W, N: Location> CycleCollection<'a, N, Tick> for Singleton<'a, T, W, Tick, N> { +// fn create_source( +// ident: syn::Ident, +// ir_leaves: FlowLeaves<'a>, +// l: &N +// ) -> Self { +// Singleton::new( +// l.id(), +// ir_leaves, +// HfPlusNode::CycleSource { +// ident, +// location_kind: l.id() +// } +// ) +// } + +// fn complete( +// self, +// ident: syn::Ident, +// ) { +// self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { +// ident, +// location_kind: self.location_kind, +// input: Box::new(self.ir_node.into_inner()), +// }); +// } +// } + +impl<'a, T: Clone, W, C, N: Location> Clone for Singleton<'a, T, W, C, N> { + fn clone(&self) -> Self { + if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { + let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); + *self.ir_node.borrow_mut() = HfPlusNode::Tee { + inner: Rc::new(RefCell::new(orig_ir_node)), + }; + } + + if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() { + Singleton { + location_kind: self.location_kind, + ir_leaves: self.ir_leaves.clone(), + ir_node: HfPlusNode::Tee { + inner: inner.clone(), + } + .into(), + _phantom: PhantomData, + } + } else { + unreachable!() + } + } +} + +impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { + pub fn map U + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Singleton<'a, U, Bounded, Tick, N> { + Singleton::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Map { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flat_map, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Stream<'a, U, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::FlatMap { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter bool + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Filter { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter_map Option + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, U, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::FilterMap { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn cross_singleton( + self, + other: impl Into>, + ) -> Optional<'a, (T, O), Bounded, Tick, N> + where + O: Clone, + { + let other: Optional<'a, O, Bounded, Tick, N> = other.into(); + if self.location_kind != other.location_kind { + panic!("cross_singleton must be called on streams on the same node"); + } + + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::CrossSingleton( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } + + pub fn continue_if( + self, + signal: Optional<'a, U, Bounded, Tick, N>, + ) -> Optional<'a, T, Bounded, Tick, N> { + self.cross_singleton(signal.map(q!(|_u| ()))) + .map(q!(|(d, _signal)| d)) + } + + pub fn continue_unless( + self, + other: Optional<'a, U, Bounded, Tick, N>, + ) -> Optional<'a, T, Bounded, Tick, N> { + self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) + } +} + +impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { + pub fn all_ticks(self) -> Stream<'a, T, Unbounded, NoTick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn defer_tick(self) -> Singleton<'a, T, Bounded, Tick, N> { + Singleton::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn delta(self) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), + ) + } +} + +pub struct Optional<'a, T, W, C, N: Location> { + pub(crate) location_kind: LocationId, + + ir_leaves: FlowLeaves<'a>, + pub(crate) ir_node: RefCell>, + + _phantom: PhantomData<(&'a mut &'a (), T, N, W, C)>, +} + +impl<'a, T, W, C, N: Location> Optional<'a, T, W, C, N> { + pub(crate) fn new( + location_kind: LocationId, + ir_leaves: FlowLeaves<'a>, + ir_node: HfPlusNode<'a>, + ) -> Self { + Optional { + location_kind, + ir_leaves, + ir_node: RefCell::new(ir_node), + _phantom: PhantomData, + } + } + + pub fn some(singleton: Singleton<'a, T, W, C, N>) -> Self { + Optional::new( + singleton.location_kind, + singleton.ir_leaves, + singleton.ir_node.into_inner(), + ) + } +} + +impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, Tick, N> { + type Location = N; + + fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self { + Optional::new( + l, + ir_leaves, + HfPlusNode::CycleSource { + ident, + location_kind: l, + }, + ) + } + + fn complete(self, ident: syn::Ident) { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind, + input: Box::new(self.ir_node.into_inner()), + }); + } +} + +impl<'a, T, W, C, N: Location> From> for Optional<'a, T, W, C, N> { + fn from(singleton: Singleton<'a, T, W, C, N>) -> Self { + Optional::some(singleton) + } +} + +impl<'a, T: Clone, W, C, N: Location> Clone for Optional<'a, T, W, C, N> { + fn clone(&self) -> Self { + if !matches!(self.ir_node.borrow().deref(), HfPlusNode::Tee { .. }) { + let orig_ir_node = self.ir_node.replace(HfPlusNode::Placeholder); + *self.ir_node.borrow_mut() = HfPlusNode::Tee { + inner: Rc::new(RefCell::new(orig_ir_node)), + }; + } + + if let HfPlusNode::Tee { inner } = self.ir_node.borrow().deref() { + Optional { + location_kind: self.location_kind, + ir_leaves: self.ir_leaves.clone(), + ir_node: HfPlusNode::Tee { + inner: inner.clone(), + } + .into(), + _phantom: PhantomData, + } + } else { + unreachable!() + } + } +} + +impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { + pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + self.ir_node.into_inner(), + ) + } + + pub fn map U + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, U, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Map { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn flat_map, F: Fn(T) -> I + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Stream<'a, U, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::FlatMap { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter bool + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Filter { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn filter_map Option + 'a>( + self, + f: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, U, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::FilterMap { + f: f.splice().into(), + input: Box::new(self.ir_node.into_inner()), + }, + ) + } + + pub fn cross_singleton( + self, + other: impl Into>, + ) -> Optional<'a, (T, O), Bounded, Tick, N> + where + O: Clone, + { + let other: Optional<'a, O, Bounded, Tick, N> = other.into(); + if self.location_kind != other.location_kind { + panic!("cross_singleton must be called on streams on the same node"); + } + + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::CrossSingleton( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } + + pub fn continue_if( + self, + signal: Optional<'a, U, Bounded, Tick, N>, + ) -> Optional<'a, T, Bounded, Tick, N> { + self.cross_singleton(signal.map(q!(|_u| ()))) + .map(q!(|(d, _signal)| d)) + } + + pub fn continue_unless( + self, + other: Optional<'a, U, Bounded, Tick, N>, + ) -> Optional<'a, T, Bounded, Tick, N> { + self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) + } + + pub fn union( + self, + other: Optional<'a, T, Bounded, Tick, N>, + ) -> Optional<'a, T, Bounded, Tick, N> { + if self.location_kind != other.location_kind { + panic!("union must be called on streams on the same node"); + } + + // TODO(shadaj): this is technically unsafe; if both of these are Some and we call as_stream we'll get two elements + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Union( + Box::new(self.ir_node.into_inner()), + Box::new(other.ir_node.into_inner()), + ), + ) + } +} + +impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { + pub fn all_ticks(self) -> Stream<'a, T, Unbounded, NoTick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn defer_tick(self) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::DeferTick(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn delta(self) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Delta(Box::new(self.ir_node.into_inner())), + ) + } +} diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 52f654275e29..bded1dc90ae7 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -15,9 +15,10 @@ use stageleft::{q, IntoQuotedMut, Quoted}; use syn::parse_quote; use crate::builder::{ClusterIds, FlowLeaves}; +use crate::cycle::CycleCollection; use crate::ir::{DebugInstantiate, HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{CanSend, Location, LocationId}; -use crate::Cluster; +use crate::{Cluster, Optional, Singleton}; /// Marks the stream as being unbounded, which means that it is not /// guaranteed to be complete in finite time. @@ -52,6 +53,52 @@ pub struct Stream<'a, T, W, C, N: Location> { _phantom: PhantomData<(&'a mut &'a (), T, N, W, C)>, } +impl<'a, T, W, N: Location> CycleCollection<'a> for Stream<'a, T, W, Tick, N> { + type Location = N; + + fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self { + Stream::new( + l, + ir_leaves, + HfPlusNode::CycleSource { + ident, + location_kind: l, + }, + ) + } + + fn complete(self, ident: syn::Ident) { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind, + input: Box::new(self.ir_node.into_inner()), + }); + } +} + +impl<'a, T, W, N: Location> CycleCollection<'a> for Stream<'a, T, W, NoTick, N> { + type Location = N; + + fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self { + Stream::new( + l, + ir_leaves, + HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { + ident, + location_kind: l, + })), + ) + } + + fn complete(self, ident: syn::Ident) { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind, + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + }); + } +} + impl<'a, T, W, C, N: Location> Stream<'a, T, W, C, N> { pub(crate) fn new( location_kind: LocationId, @@ -151,11 +198,12 @@ impl<'a, T, W, C, N: Location> Stream<'a, T, W, C, N> { pub fn cross_singleton( self, - other: Stream<'a, O, Bounded, C, N>, + other: impl Into>, ) -> Stream<'a, (T, O), W, C, N> where O: Clone, { + let other: Optional<'a, O, Bounded, C, N> = other.into(); if self.location_kind != other.location_kind { panic!("cross_singleton must be called on streams on the same node"); } @@ -170,12 +218,6 @@ impl<'a, T, W, C, N: Location> Stream<'a, T, W, C, N> { ) } - /// Allow this stream through if the other stream has elements, otherwise the output is empty. - pub fn continue_if(self, signal: Stream<'a, U, Bounded, C, N>) -> Stream<'a, T, W, C, N> { - self.cross_singleton(signal.map(q!(|_u| ()))) - .map(q!(|(d, _signal)| d)) - } - // TODO(shadaj): should allow for differing windows, using strongest one pub fn cross_product(self, other: Stream<'a, O, W, C, N>) -> Stream<'a, (T, O), W, C, N> where @@ -258,12 +300,29 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> { ) } + pub fn first(self) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + self.ir_node.into_inner(), + ) + } + + /// Allow this stream through if the other stream has elements, otherwise the output is empty. + pub fn continue_if( + self, + signal: Optional<'a, U, Bounded, Tick, N>, + ) -> Stream<'a, T, Bounded, Tick, N> { + self.cross_singleton(signal.map(q!(|_u| ()))) + .map(q!(|(d, _signal)| d)) + } + /// Allow this stream through if the other stream is empty, otherwise the output is empty. pub fn continue_unless( self, - other: Stream<'a, U, Bounded, Tick, N>, + other: Optional<'a, U, Bounded, Tick, N>, ) -> Stream<'a, T, Bounded, Tick, N> { - self.continue_if(other.count().filter(q!(|c| *c == 0))) + self.continue_if(other.into_stream().count().filter(q!(|c| *c == 0))) } pub fn enumerate(self) -> Stream<'a, (usize, T), Bounded, Tick, N> { @@ -278,8 +337,8 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> { self, init: impl IntoQuotedMut<'a, I>, comb: impl IntoQuotedMut<'a, F>, - ) -> Stream<'a, A, Bounded, Tick, N> { - Stream::new( + ) -> Singleton<'a, A, Bounded, Tick, N> { + Singleton::new( self.location_kind, self.ir_leaves, HfPlusNode::Fold { @@ -293,8 +352,8 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> { pub fn reduce( self, comb: impl IntoQuotedMut<'a, F>, - ) -> Stream<'a, T, Bounded, Tick, N> { - Stream::new( + ) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( self.location_kind, self.ir_leaves, HfPlusNode::Reduce { @@ -315,7 +374,7 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> { ) } - pub fn count(self) -> Stream<'a, usize, Bounded, Tick, N> { + pub fn count(self) -> Singleton<'a, usize, Bounded, Tick, N> { self.fold(q!(|| 0usize), q!(|count, _| *count += 1)) } @@ -388,7 +447,7 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> { }, ); - self.tick_batch().continue_if(samples).all_ticks() + self.tick_batch().continue_if(samples.first()).all_ticks() } } diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index eea9d371fa85..b79ef502550d 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -6,7 +6,6 @@ use std::time::{Duration, SystemTime}; use hydroflow_plus::*; use serde::{Deserialize, Serialize}; use stageleft::*; -use stream::{NoTick, Tick}; use tokio::time::Instant; pub struct Proposer {} @@ -119,18 +118,21 @@ pub fn paxos( flow.source_iter(&proposers, q!(["Proposers say hello"])) .for_each(q!(|s| println!("{}", s))); let p_id = flow.cluster_self_id(&proposers); - let (p_is_leader_complete_cycle, p_is_leader) = flow.tick_cycle(&proposers); + let (p_is_leader_complete_cycle, p_is_leader) = + flow.cycle::>(&proposers); let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) = - flow.cycle(&proposers); - let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b) = flow.cycle(&proposers); + flow.cycle::>>(&proposers); + let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b) = + flow.cycle::>(&proposers); a_to_proposers_p1b .clone() .for_each(q!(|(_, p1b): (u32, P1b)| println!( "Proposer received P1b: {:?}", p1b ))); - let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b) = flow.cycle(&proposers); + let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b) = + flow.cycle::>(&proposers); // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b))); // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot))); // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload))); @@ -219,14 +221,14 @@ pub fn paxos( #[allow(clippy::type_complexity)] fn acceptor<'a>( - p_to_acceptors_p1a: Stream<'a, P1a, stream::Unbounded, NoTick, Cluster>, - p_to_acceptors_p2a: Stream<'a, P2a, stream::Unbounded, NoTick, Cluster>, - r_to_acceptors_checkpoint: Stream<'a, (u32, i32), stream::Unbounded, NoTick, Cluster>, + p_to_acceptors_p1a: Stream<'a, P1a, Unbounded, NoTick, Cluster>, + p_to_acceptors_p2a: Stream<'a, P2a, Unbounded, NoTick, Cluster>, + r_to_acceptors_checkpoint: Stream<'a, (u32, i32), Unbounded, NoTick, Cluster>, proposers: &Cluster, f: usize, ) -> ( - Stream<'a, (u32, P1b), stream::Unbounded, NoTick, Cluster>, - Stream<'a, (u32, P2b), stream::Unbounded, NoTick, Cluster>, + Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster>, + Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster>, ) { // Get the latest checkpoint sequence per replica let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint @@ -292,7 +294,7 @@ fn acceptor<'a>( } )); let a_log = a_p2as_to_place_in_log - .union(a_new_checkpoint) + .union(a_new_checkpoint.into_stream()) .persist() .fold( q!(|| (-1, HashMap::::new())), @@ -373,13 +375,12 @@ fn acceptor<'a>( fn p_p2b<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, - a_to_proposers_p2b: Stream<'a, (u32, P2b), stream::Unbounded, NoTick, Cluster>, + a_to_proposers_p2b: Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster>, replicas: &Cluster, f: usize, -) -> Stream<'a, ReplicaPayload, stream::Unbounded, NoTick, Cluster> { - let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = - flow.tick_cycle(proposers); - let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = flow.tick_cycle(proposers); +) -> Stream<'a, ReplicaPayload, Unbounded, NoTick, Cluster> { + let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = flow.cycle(proposers); + let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = flow.cycle(proposers); let p_p2b = a_to_proposers_p2b .clone() .tick_batch() @@ -461,19 +462,19 @@ fn p_p2b<'a>( fn p_p2a<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, - p_max_slot: Stream<'a, i32, stream::Bounded, Tick, Cluster>, - c_to_proposers: Stream<'a, ClientPayload, stream::Unbounded, NoTick, Cluster>, - p_ballot_num: Stream<'a, u32, stream::Bounded, Tick, Cluster>, - p_log_to_try_commit: Stream<'a, P2a, stream::Bounded, Tick, Cluster>, - p_log_holes: Stream<'a, P2a, stream::Bounded, Tick, Cluster>, - p_is_leader: Stream<'a, bool, stream::Bounded, Tick, Cluster>, + p_max_slot: Singleton<'a, i32, Bounded, Tick, Cluster>, + c_to_proposers: Stream<'a, ClientPayload, Unbounded, NoTick, Cluster>, + p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, + p_log_to_try_commit: Stream<'a, P2a, Bounded, Tick, Cluster>, + p_log_holes: Stream<'a, P2a, Bounded, Tick, Cluster>, + p_is_leader: Optional<'a, bool, Bounded, Tick, Cluster>, acceptors: &Cluster, ) -> ( - Stream<'a, i32, stream::Bounded, Tick, Cluster>, - Stream<'a, P2a, stream::Unbounded, NoTick, Cluster>, + Optional<'a, i32, Bounded, Tick, Cluster>, + Stream<'a, P2a, Unbounded, NoTick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); - let (p_next_slot_complete_cycle, p_next_slot) = flow.tick_cycle(proposers); + let (p_next_slot_complete_cycle, p_next_slot) = flow.cycle::>(proposers); let p_next_slot_after_reconciling_p1bs = p_max_slot // .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot))) .continue_unless(p_next_slot.clone()) @@ -534,15 +535,15 @@ fn p_p2a<'a>( fn p_p1b<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, - a_to_proposers_p1b: Stream<'a, (u32, P1b), stream::Unbounded, NoTick, Cluster>, - p_ballot_num: Stream<'a, u32, stream::Bounded, Tick, Cluster>, - p_has_largest_ballot: Stream<'a, (Ballot, u32), stream::Bounded, Tick, Cluster>, + a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster>, + p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, + p_has_largest_ballot: Optional<'a, (Ballot, u32), Bounded, Tick, Cluster>, f: usize, ) -> ( - Stream<'a, bool, stream::Bounded, Tick, Cluster>, - Stream<'a, P2a, stream::Bounded, Tick, Cluster>, - Stream<'a, i32, stream::Bounded, Tick, Cluster>, - Stream<'a, P2a, stream::Bounded, Tick, Cluster>, + Optional<'a, bool, Bounded, Tick, Cluster>, + Stream<'a, P2a, Bounded, Tick, Cluster>, + Singleton<'a, i32, Bounded, Tick, Cluster>, + Stream<'a, P2a, Bounded, Tick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); let p_relevant_p1bs = a_to_proposers_p1b @@ -652,13 +653,13 @@ fn p_p1b<'a>( fn replica<'a>( flow: &FlowBuilder<'a>, replicas: &Cluster, - p_to_replicas: Stream<'a, ReplicaPayload, stream::Unbounded, NoTick, Cluster>, + p_to_replicas: Stream<'a, ReplicaPayload, Unbounded, NoTick, Cluster>, checkpoint_frequency: usize, ) -> ( - Stream<'a, i32, stream::Unbounded, NoTick, Cluster>, - Stream<'a, (u32, ReplicaPayload), stream::Unbounded, NoTick, Cluster>, + Stream<'a, i32, Unbounded, NoTick, Cluster>, + Stream<'a, (u32, ReplicaPayload), Unbounded, NoTick, Cluster>, ) { - let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = flow.tick_cycle(replicas); + let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = flow.cycle(replicas); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); let r_sorted_payloads = p_to_replicas .clone() @@ -666,8 +667,9 @@ fn replica<'a>( .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it - let (r_highest_seq_complete_cycle, r_highest_seq) = flow.tick_cycle(replicas); - let empty_slot = flow.source_iter(replicas, q!([-1])).tick_batch(); + let (r_highest_seq_complete_cycle, r_highest_seq) = + flow.cycle::>(replicas); + let empty_slot = flow.source_iter(replicas, q!([-1])).tick_batch().first(); // Either the max sequence number executed so far or -1. Need to union otherwise r_highest_seq is empty and joins with it will fail let r_highest_seq_with_default = r_highest_seq.union(empty_slot); // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole. @@ -732,10 +734,11 @@ fn replica<'a>( i32 )| highest_seq)) .defer_tick(); - r_highest_seq_complete_cycle.complete(r_new_highest_seq.clone()); + r_highest_seq_complete_cycle.complete(r_new_highest_seq.clone().into()); // Send checkpoints to the acceptors when we've processed enough payloads - let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = flow.tick_cycle(replicas); + let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = + flow.cycle::>(replicas); let r_max_checkpointed_seq = r_checkpointed_seqs.persist().fold( q!(|| -1), q!(|max_seq: &mut i32, seq: i32| { @@ -770,11 +773,11 @@ fn replica<'a>( // Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed. fn client<'a>( clients: &Cluster, - p_to_clients_leader_elected: Stream<'a, Ballot, stream::Unbounded, NoTick, Cluster>, + p_to_clients_leader_elected: Stream<'a, Ballot, Unbounded, NoTick, Cluster>, r_to_clients_payload_applied: Stream< 'a, (u32, ReplicaPayload), - stream::Unbounded, + Unbounded, NoTick, Cluster, >, @@ -782,7 +785,7 @@ fn client<'a>( num_clients_per_node: usize, median_latency_window_size: usize, f: usize, -) -> Stream<'a, (u32, ClientPayload), stream::Unbounded, NoTick, Cluster> { +) -> Stream<'a, (u32, ClientPayload), Unbounded, NoTick, Cluster> { let c_id = flow.cluster_self_id(clients); p_to_clients_leader_elected .clone() @@ -814,8 +817,7 @@ fn client<'a>( } )))); // Whenever replicas confirm that a payload was committed, collected it and wait for a quorum - let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = - flow.tick_cycle(clients); + let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = flow.cycle(clients); let c_received_payloads = r_to_clients_payload_applied .tick_batch() .map(q!(|(sender, replica_payload): (u32, ReplicaPayload)| ( @@ -858,7 +860,8 @@ fn client<'a>( .all_ticks(); // Track statistics - let (c_timers_complete_cycle, c_timers) = flow.tick_cycle(clients); + let (c_timers_complete_cycle, c_timers) = + flow.cycle::>(clients); let c_new_timers_when_leader_elected = c_new_leader_ballot .map(q!(|_: Ballot| SystemTime::now())) .flat_map(q!(move |now: SystemTime| (0..num_clients_per_node) @@ -880,7 +883,8 @@ fn client<'a>( let c_stats_output_timer = flow .source_interval(clients, q!(Duration::from_secs(1))) - .tick_batch(); + .tick_batch() + .first(); let c_latency_reset = c_stats_output_timer .clone() @@ -895,7 +899,7 @@ fn client<'a>( )| Some( curr_time.duration_since(prev_time).unwrap().as_micros() ))) - .union(c_latency_reset) + .union(c_latency_reset.into_stream()) .persist() .fold( // Create window with ring buffer using vec + wraparound index @@ -994,10 +998,10 @@ fn client<'a>( // Proposer logic to calculate the largest ballot received so far. fn p_max_ballot<'a>( - a_to_proposers_p1b: Stream<'a, (u32, P1b), stream::Unbounded, NoTick, Cluster>, - a_to_proposers_p2b: Stream<'a, (u32, P2b), stream::Unbounded, NoTick, Cluster>, - p_to_proposers_i_am_leader: Stream<'a, Ballot, stream::Unbounded, NoTick, Cluster>, -) -> Stream<'a, Ballot, stream::Bounded, Tick, Cluster> { + a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster>, + a_to_proposers_p2b: Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster>, + p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster>, +) -> Singleton<'a, Ballot, Bounded, Tick, Cluster> { let p_received_p1b_ballots = a_to_proposers_p1b .clone() .map(q!(|(_, p1b): (_, P1b)| p1b.max_ballot)); @@ -1025,13 +1029,14 @@ fn p_max_ballot<'a>( fn p_ballot_calc<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, - p_received_max_ballot: Stream<'a, Ballot, stream::Bounded, Tick, Cluster>, + p_received_max_ballot: Singleton<'a, Ballot, Bounded, Tick, Cluster>, ) -> ( - Stream<'a, u32, stream::Bounded, Tick, Cluster>, - Stream<'a, (Ballot, u32), stream::Bounded, Tick, Cluster>, + Optional<'a, u32, Bounded, Tick, Cluster>, + Optional<'a, (Ballot, u32), Bounded, Tick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); - let (p_ballot_num_complete_cycle, p_ballot_num) = flow.tick_cycle(proposers); + let (p_ballot_num_complete_cycle, p_ballot_num) = + flow.cycle::>(proposers); let p_has_largest_ballot = p_received_max_ballot .clone() .cross_singleton(p_ballot_num.clone()) @@ -1062,7 +1067,7 @@ fn p_ballot_calc<'a>( } })) .defer_tick(); - let p_start_ballot_num = flow.source_iter(proposers, q!([0])).tick_batch(); + let p_start_ballot_num = flow.source_iter(proposers, q!([0])).tick_batch().first(); p_ballot_num_complete_cycle.complete(p_start_ballot_num.union(p_new_ballot_num)); // End stable leader election (p_ballot_num, p_has_largest_ballot) @@ -1071,18 +1076,18 @@ fn p_ballot_calc<'a>( // Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. #[allow(clippy::too_many_arguments, clippy::type_complexity)] fn p_p1a<'a>( - p_ballot_num: Stream<'a, u32, stream::Bounded, Tick, Cluster>, - p_is_leader: Stream<'a, bool, stream::Bounded, Tick, Cluster>, + p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, + p_is_leader: Optional<'a, bool, Bounded, Tick, Cluster>, proposers: &Cluster, - p_to_proposers_i_am_leader: Stream<'a, Ballot, stream::Unbounded, NoTick, Cluster>, + p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster>, flow: &FlowBuilder<'a>, acceptors: &Cluster, i_am_leader_send_timeout: u64, // How often to heartbeat i_am_leader_check_timeout: u64, // How often to check if heartbeat expired i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( - Stream<'a, Ballot, stream::Unbounded, NoTick, Cluster>, - Stream<'a, P1a, stream::Unbounded, NoTick, Cluster>, + Stream<'a, Ballot, Unbounded, NoTick, Cluster>, + Stream<'a, P1a, Unbounded, NoTick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); let p_to_proposers_i_am_leader_new = p_ballot_num @@ -1112,6 +1117,7 @@ fn p_p1a<'a>( // Add random delay depending on node ID so not everyone sends p1a at the same time let p_leader_expired = flow.source_interval_delayed(proposers, q!(Duration::from_secs((p_id * i_am_leader_check_timeout_delay_multiplier as u32).into())), q!(Duration::from_secs(i_am_leader_check_timeout))) .tick_batch() + .first() .cross_singleton(p_latest_received_i_am_leader.clone()) // .inspect(q!(|v| println!("Proposer checking if leader expired"))) // .continue_if(p_is_leader.clone().count().filter(q!(|c| *c == 0)).inspect(q!(|c| println!("Proposer is_leader count: {}", c)))) diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap index 4e569fa25362..5b990667a869 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap @@ -306,7 +306,7 @@ expression: built.ir() Map { f: { use crate :: __staged :: cluster :: paxos :: * ; | batch_size : usize | (batch_size , false) }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, @@ -349,9 +349,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -521,7 +521,7 @@ expression: built.ir() value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -556,9 +556,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -685,7 +685,7 @@ expression: built.ir() 0, ), input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( FilterMap { f: { use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received : usize | if num_received > f { Some (true) } else { None } }, @@ -735,7 +735,7 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { @@ -829,14 +829,14 @@ expression: built.ir() Tee { inner: RefCell { value: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Union( Union( Map { f: { use crate :: __staged :: cluster :: paxos :: * ; | max_slot : i32 | max_slot + 1 }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -893,9 +893,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -923,7 +923,7 @@ expression: built.ir() Tee { inner: RefCell { value: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1064,7 +1064,7 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { @@ -1231,7 +1231,7 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1246,9 +1246,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -1404,7 +1404,7 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: CycleSource { @@ -1425,7 +1425,7 @@ expression: built.ir() Map { f: { use crate :: __staged :: cluster :: paxos :: * ; | _ : bool | 0 }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1440,23 +1440,23 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, input: Tee { inner: RefCell { value: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Union( Union( Map { f: { use crate :: __staged :: cluster :: paxos :: * ; | max_slot : i32 | max_slot + 1 }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1513,9 +1513,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -1543,7 +1543,7 @@ expression: built.ir() Tee { inner: RefCell { value: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1684,7 +1684,7 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { @@ -1851,7 +1851,7 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -1866,9 +1866,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -2024,7 +2024,7 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: CycleSource { @@ -2094,7 +2094,7 @@ expression: built.ir() f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | (_is_leader , ballot_num) : (bool , u32) | Ballot { num : ballot_num , id : p_id } }, input: CrossSingleton( Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -2109,9 +2109,9 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -2391,7 +2391,7 @@ expression: built.ir() input: Map { f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | ballot_num : u32 | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -2406,13 +2406,13 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -2447,9 +2447,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -2559,7 +2559,7 @@ expression: built.ir() input: Map { f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | ballot_num : u32 | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -2574,13 +2574,13 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -2615,9 +2615,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -2693,7 +2693,7 @@ expression: built.ir() input: Map { f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | ballot_num : u32 | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -2708,13 +2708,13 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -2749,9 +2749,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -3281,7 +3281,7 @@ expression: built.ir() input: Map { f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | ballot_num : u32 | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -3296,13 +3296,13 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -3337,9 +3337,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, @@ -3965,7 +3965,7 @@ expression: built.ir() input: Map { f: { use crate :: __staged :: cluster :: paxos :: * ; let p_id = __hydroflow_plus_cluster_self_id_0 ; move | ballot_num : u32 | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { @@ -3980,13 +3980,13 @@ expression: built.ir() }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Tee { inner: RefCell { value: Filter { f: { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) : & (_ , Option < Instant >) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( CrossSingleton( Source { @@ -4021,9 +4021,9 @@ expression: built.ir() }, ), Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Filter { - f: { use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }, input: Fold { init: { use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }, acc: { use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }, diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index f700f30e9fda..98735c10a60f 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -20,9 +20,11 @@ pub fn compute_pi(flow: &FlowBuilder, batch_size: RuntimeData) -> Process *total += 1; }), - ); + ) + .all_ticks(); trials + .tick_batch() .persist() .reduce(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; diff --git a/hydroflow_plus_test_local/src/local/graph_reachability.rs b/hydroflow_plus_test_local/src/local/graph_reachability.rs index fcea5f07f39b..91c2ea4afa7d 100644 --- a/hydroflow_plus_test_local/src/local/graph_reachability.rs +++ b/hydroflow_plus_test_local/src/local/graph_reachability.rs @@ -16,7 +16,7 @@ pub fn graph_reachability<'a>( let roots = flow.source_stream(&process, roots).tick_batch(); let edges = flow.source_stream(&process, edges); - let (set_reached_cycle, reached_cycle) = flow.tick_cycle(&process); + let (set_reached_cycle, reached_cycle) = flow.cycle(&process); let reached = roots.union(reached_cycle); let reachable = reached