Skip to content

Commit

Permalink
feat(hydroflow_plus)!: separate singletons into their own types (hydr…
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Aug 27, 2024
1 parent 22c7218 commit 44c6b14
Show file tree
Hide file tree
Showing 9 changed files with 689 additions and 224 deletions.
57 changes: 3 additions & 54 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use quote::quote;
use runtime_support::FreeVariable;
use stageleft::*;

use crate::cycle::CycleCollection;
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
use crate::location::{Cluster, Location, LocationId, Process};
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
Expand Down Expand Up @@ -277,50 +278,7 @@ impl<'a> FlowBuilder<'a> {
)
}

#[allow(clippy::type_complexity)]
pub fn cycle<T, W, L: Location>(
&self,
on: &L,
) -> (HfCycle<'a, T, W, NoTick, L>, Stream<'a, T, W, NoTick, L>) {
let next_id = {
let on_id = match on.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
};

let mut cycle_ids = self.cycle_ids.borrow_mut();
let next_id_entry = cycle_ids.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
HfCycle {
ident: ident.clone(),
location_kind: on.id(),
ir_leaves: self.ir_leaves().clone(),
_phantom: PhantomData,
},
Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
ident,
location_kind: on.id(),
})),
),
)
}

#[allow(clippy::type_complexity)]
pub fn tick_cycle<T, W, L: Location>(
&self,
on: &L,
) -> (HfCycle<'a, T, W, Tick, L>, Stream<'a, T, W, Tick, L>) {
pub fn cycle<S: CycleCollection<'a>>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) {
let next_id = {
let on_id = match on.id() {
LocationId::Process(id) => id,
Expand All @@ -340,18 +298,9 @@ impl<'a> FlowBuilder<'a> {
(
HfCycle {
ident: ident.clone(),
location_kind: on.id(),
ir_leaves: self.ir_leaves().clone(),
_phantom: PhantomData,
},
Stream::new(
on.id(),
self.ir_leaves().clone(),
HfPlusNode::CycleSource {
ident,
location_kind: on.id(),
},
),
S::create_source(ident, self.ir_leaves.clone(), on.id()),
)
}
}
44 changes: 14 additions & 30 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,28 @@
use std::marker::PhantomData;

use crate::builder::FlowLeaves;
use crate::ir::{HfPlusLeaf, HfPlusNode};
use crate::location::{Location, LocationId};
use crate::stream::{NoTick, Tick};
use crate::Stream;

pub trait CycleCollection<'a> {
type Location: Location;

fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self;

fn complete(self, ident: syn::Ident);
}

/// Represents a fixpoint cycle in the graph that will be fulfilled
/// by a stream that is not yet known.
///
/// See [`Stream`] for an explainer on the type parameters.
pub struct HfCycle<'a, T, W, C, N: Location> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct HfCycle<'a, S: CycleCollection<'a>> {
pub(crate) ident: syn::Ident,
pub(crate) location_kind: LocationId,
pub(crate) ir_leaves: FlowLeaves<'a>,
pub(crate) _phantom: PhantomData<(N, &'a mut &'a (), T, W, C)>,
}

impl<'a, T, W, N: Location> HfCycle<'a, T, W, Tick, N> {
pub fn complete(self, stream: Stream<'a, T, W, Tick, N>) {
let ident = self.ident;

self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind,
input: Box::new(stream.ir_node.into_inner()),
});
}
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
}

impl<'a, T, W, N: Location> HfCycle<'a, T, W, NoTick, N> {
pub fn complete(self, stream: Stream<'a, T, W, NoTick, N>) {
impl<'a, S: CycleCollection<'a>> HfCycle<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;

self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
ident,
location_kind: self.location_kind,
input: Box::new(HfPlusNode::Unpersist(
Box::new(stream.ir_node.into_inner())
)),
});
S::complete(stream, ident)
}
}
5 changes: 4 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ pub mod runtime_support {
}

pub mod stream;
pub use stream::Stream;
pub use stream::{Bounded, NoTick, Stream, Tick, Unbounded};

pub mod singleton;
pub use singleton::{Optional, Singleton};

pub mod location;
pub use location::{Cluster, Process};
Expand Down
Loading

0 comments on commit 44c6b14

Please sign in to comment.