Skip to content

Commit

Permalink
feat(hydroflow_plus)!: introduce an unordered variant of streams to s…
Browse files Browse the repository at this point in the history
…trengthen determinism guarantees (#1568)

Previously, sending data from a `Cluster` would return a stream assumed
to have deterministic contents **and** ordering, which is false. This
introduces another type parameter for `Stream` which tracks whether
element ordering is expected to be deterministic, and restricts
operators such as `fold` and `reduce` to commutative aggregations
accordingly.
  • Loading branch information
shadaj authored Nov 21, 2024
1 parent 0c104c5 commit 4c5ca31
Show file tree
Hide file tree
Showing 16 changed files with 799 additions and 575 deletions.
18 changes: 15 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hydroflow_plus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0", optional =
prettyplease = { version = "0.2.0", features = [ "verbatim" ], optional = true }
toml = { version = "0.8.0", optional = true }
trybuild-internals-api = { version = "1.0.99", optional = true }
sealed = "0.6.0"

[build-dependencies]
stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" }
Expand Down
7 changes: 7 additions & 0 deletions hydroflow_plus/src/boundedness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/// Marks the stream as being unbounded, which means that it is not
/// guaranteed to be complete in finite time.
pub enum Unbounded {}

/// Marks the stream as being bounded, which means that it is guaranteed
/// to be complete in finite time.
pub enum Bounded {}
5 changes: 4 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ pub mod runtime_support {
pub mod runtime_context;
pub use runtime_context::RuntimeContext;

pub mod boundedness;
pub use boundedness::{Bounded, Unbounded};

pub mod stream;
pub use stream::{Bounded, Stream, Unbounded};
pub use stream::Stream;

pub mod singleton;
pub use singleton::Singleton;
Expand Down
10 changes: 10 additions & 0 deletions hydroflow_plus/src/location/can_send.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use stageleft::quote_type;

use super::{Cluster, ClusterId, ExternalProcess, Location, Process};
use crate::stream::NoOrder;

pub trait CanSend<'a, To: Location<'a>>: Location<'a> {
type In<T>;
type Out<T>;

/// Given the ordering guarantees of the input, determines the strongest possible
/// ordering guarantees of the output.
type OutStrongestOrder<InOrder>;

fn is_demux() -> bool;
fn tagged_type() -> Option<syn::Type>;
}

impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
type In<T> = T;
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
false
Expand All @@ -26,6 +32,7 @@ impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> {
type In<T> = (ClusterId<C2>, T);
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
true
Expand All @@ -39,6 +46,7 @@ impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> {
impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> {
type In<T> = T;
type Out<T> = (ClusterId<C1>, T);
type OutStrongestOrder<InOrder> = NoOrder;

fn is_demux() -> bool {
false
Expand All @@ -52,6 +60,7 @@ impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> {
impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> {
type In<T> = (ClusterId<C2>, T);
type Out<T> = (ClusterId<C1>, T);
type OutStrongestOrder<InOrder> = NoOrder;

fn is_demux() -> bool {
true
Expand All @@ -65,6 +74,7 @@ impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> {
impl<'a, P1, E2> CanSend<'a, ExternalProcess<'a, E2>> for Process<'a, P1> {
type In<T> = T;
type Out<T> = T;
type OutStrongestOrder<InOrder> = InOrder;

fn is_demux() -> bool {
false
Expand Down
12 changes: 0 additions & 12 deletions hydroflow_plus/src/location/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,6 @@ impl<C> PartialEq for ClusterId<C> {

impl<C> Eq for ClusterId<C> {}

impl<C> PartialOrd for ClusterId<C> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl<C> Ord for ClusterId<C> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.raw_id.cmp(&other.raw_id)
}
}

impl<C> Hash for ClusterId<C> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.raw_id.hash(state)
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::cycle::{
};
use crate::ir::{HfPlusLeaf, HfPlusNode, TeeNode};
use crate::location::{check_matching_location, Location, LocationId, NoTick, Tick};
use crate::stream::{Bounded, Unbounded};
use crate::{Optional, Stream};
use crate::{Bounded, Optional, Stream, Unbounded};

pub struct Singleton<T, L, B> {
pub(crate) location: L,
Expand Down
Loading

0 comments on commit 4c5ca31

Please sign in to comment.