Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow_plus): add unbounded top-level singletons #1427

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::cycle::CycleCollection;
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Cluster, Location, LocationId, Process};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::{HfCycle, RuntimeContext, Stream};
use crate::{HfCycle, Optional, RuntimeContext, Stream};

pub mod built;
pub mod deploy;
Expand Down Expand Up @@ -251,10 +251,10 @@ impl<'a> FlowBuilder<'a> {
&self,
on: &L,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, (), Unbounded, NoTick, L> {
) -> Optional<'a, (), Unbounded, NoTick, L> {
let interval = interval.splice();

Stream::new(
Optional::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
Expand All @@ -269,13 +269,16 @@ impl<'a> FlowBuilder<'a> {
on: &L,
delay: impl Quoted<'a, Duration> + Copy + 'a,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<'a, tokio::time::Instant, Unbounded, NoTick, L> {
) -> Optional<'a, tokio::time::Instant, Unbounded, NoTick, L> {
self.source_stream(
on,
q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)),
)
.tick_batch()
.first()
.latest()
}

pub fn cycle<S: CycleCollection<'a>>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) {
Expand Down
162 changes: 142 additions & 20 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted};

use crate::builder::FlowLeaves;
use crate::cycle::CycleCollection;
use crate::ir::{HfPlusLeaf, HfPlusNode};
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Location, LocationId};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
use crate::Stream;
Expand Down Expand Up @@ -90,11 +90,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Singleton<'a, T, W, C, N> {
}
}

impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> {
pub fn map<U, F: Fn(T) -> U + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Singleton<'a, U, Bounded, Tick, N> {
) -> Singleton<'a, U, W, C, N> {
Singleton::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -108,7 +108,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Stream<'a, U, Bounded, Tick, N> {
) -> Stream<'a, U, W, C, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -122,7 +122,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Bounded, Tick, N> {
) -> Optional<'a, T, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -136,7 +136,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -146,7 +146,9 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
},
)
}
}

impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Bounded, Tick, N>>,
Expand Down Expand Up @@ -194,6 +196,14 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
)
}

pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
)
}

pub fn defer_tick(self) -> Singleton<'a, T, Bounded, Tick, N> {
Singleton::new(
self.location_kind,
Expand All @@ -219,6 +229,33 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> {
}
}

impl<'a, T, N: Location> Singleton<'a, T, Unbounded, NoTick, N> {
pub fn latest_tick(self) -> Singleton<'a, T, Bounded, Tick, N> {
Singleton::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Unbounded, NoTick, N>>,
) -> Optional<'a, (T, O), Unbounded, NoTick, N>
where
O: Clone,
{
let other: Optional<'a, O, Unbounded, NoTick, N> = other.into();
if self.location_kind != other.location_kind {
panic!("cross_singleton must be called on streams on the same node");
}

self.latest_tick()
.cross_singleton(other.latest_tick())
.latest()
}
}

pub struct Optional<'a, T, W, C, N: Location> {
pub(crate) location_kind: LocationId,

Expand Down Expand Up @@ -274,6 +311,29 @@ impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, Tick, N>
}
}

impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, NoTick, N> {
type Location = N;

fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self {
Optional::new(
l,
ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
ident,
location_kind: l,
})),
)
}

fn complete(self, ident: syn::Ident) {
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind,
input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))),
});
}
}

impl<'a, T, W, C, N: Location> From<Singleton<'a, T, W, C, N>> for Optional<'a, T, W, C, N> {
fn from(singleton: Singleton<'a, T, W, C, N>) -> Self {
Optional::some(singleton)
Expand Down Expand Up @@ -305,20 +365,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Optional<'a, T, W, C, N> {
}
}

impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
// TODO(shadaj): this is technically incorrect; we should only return the first element of the stream
pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
self.ir_node.into_inner(),
)
}

impl<'a, T, W, C, N: Location> Optional<'a, T, W, C, N> {
pub fn map<U, F: Fn(T) -> U + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -332,7 +383,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn flat_map<U, I: IntoIterator<Item = U>, F: Fn(T) -> I + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Stream<'a, U, Bounded, Tick, N> {
) -> Stream<'a, U, W, C, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -346,7 +397,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn filter<F: Fn(&T) -> bool + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Bounded, Tick, N> {
) -> Optional<'a, T, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -360,7 +411,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
pub fn filter_map<U, F: Fn(T) -> Option<U> + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, U, Bounded, Tick, N> {
) -> Optional<'a, U, W, C, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
Expand All @@ -370,6 +421,17 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
},
)
}
}

impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
// TODO(shadaj): this is technically incorrect; we should only return the first element of the stream
pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> {
Stream::new(
self.location_kind,
self.ir_leaves,
self.ir_node.into_inner(),
)
}

pub fn cross_singleton<O>(
self,
Expand Down Expand Up @@ -436,6 +498,14 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
)
}

pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(self.ir_node.into_inner())),
)
}

pub fn defer_tick(self) -> Optional<'a, T, Bounded, Tick, N> {
Optional::new(
self.location_kind,
Expand All @@ -460,3 +530,55 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> {
)
}
}

impl<'a, T, N: Location> Optional<'a, T, Unbounded, NoTick, N> {
pub fn latest_tick(self) -> Optional<'a, T, Bounded, Tick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())),
)
}

pub fn tick_samples(self) -> Stream<'a, T, Unbounded, NoTick, N> {
self.latest_tick().all_ticks()
}

pub fn cross_singleton<O>(
self,
other: impl Into<Optional<'a, O, Unbounded, NoTick, N>>,
) -> Optional<'a, (T, O), Unbounded, NoTick, N>
where
O: Clone,
{
let other: Optional<'a, O, Unbounded, NoTick, N> = other.into();
if self.location_kind != other.location_kind {
panic!("cross_singleton must be called on streams on the same node");
}

self.latest_tick()
.cross_singleton(other.latest_tick())
.latest()
}

pub fn sample_every(
self,
duration: impl Quoted<'a, std::time::Duration> + Copy + 'a,
) -> Stream<'a, T, Unbounded, NoTick, N> {
let interval = duration.splice();

let samples = Stream::<'a, hydroflow::tokio::time::Instant, Bounded, Tick, N>::new(
self.location_kind,
self.ir_leaves.clone(),
HfPlusNode::Source {
source: HfPlusSource::Interval(interval.into()),
location_kind: self.location_kind,
},
);

self.latest_tick()
.continue_if(samples.first())
.latest()
.tick_samples()
}
}
45 changes: 44 additions & 1 deletion hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> {
)
}

pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> {
pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N>
where
T: Clone,
{
Stream::new(
self.location_kind,
self.ir_leaves,
Expand Down Expand Up @@ -407,6 +410,13 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> {
)
}

pub fn tick_prefix(self) -> Stream<'a, T, Bounded, Tick, N>
where
T: Clone,
{
self.tick_batch().persist()
}

pub fn inspect<F: Fn(&T) + 'a>(
self,
f: impl IntoQuotedMut<'a, F>,
Expand Down Expand Up @@ -449,6 +459,39 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> {

self.tick_batch().continue_if(samples.first()).all_ticks()
}

pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, T)>(
self,
init: impl IntoQuotedMut<'a, I>,
comb: impl IntoQuotedMut<'a, F>,
) -> Singleton<'a, A, Unbounded, NoTick, N> {
// unbounded singletons are represented as a stream
// which produces all values from all ticks every tick,
// so delta will always give the lastest aggregation
Singleton::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::Fold {
init: init.splice().into(),
acc: comb.splice().into(),
input: Box::new(self.ir_node.into_inner()),
})),
)
}

pub fn reduce<F: Fn(&mut T, T) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F>,
) -> Optional<'a, T, Unbounded, NoTick, N> {
Optional::new(
self.location_kind,
self.ir_leaves,
HfPlusNode::Persist(Box::new(HfPlusNode::Reduce {
f: comb.splice().into(),
input: Box::new(self.ir_node.into_inner()),
})),
)
}
}

impl<'a, T, C, N: Location> Stream<'a, T, Bounded, C, N> {
Expand Down
3 changes: 0 additions & 3 deletions hydroflow_plus_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ pub fn compute_pi(flow: &FlowBuilder, batch_size: usize) -> (Cluster<Worker>, Pr

trials
.send_bincode_interleaved(&process)
.tick_batch()
.persist()
.reduce(q!(|(inside, total), (inside_batch, total_batch)| {
*inside += inside_batch;
*total += total_batch;
}))
.all_ticks()
.sample_every(q!(Duration::from_secs(1)))
.for_each(q!(|(inside, total)| {
println!(
Expand Down
Loading
Loading