Skip to content

Commit 8306827

Browse files
committed
feat(hydroflow_plus)!: separate singletons into their own types
1 parent 536e644 commit 8306827

File tree

9 files changed

+689
-224
lines changed

9 files changed

+689
-224
lines changed

hydroflow_plus/src/builder/mod.rs

+3-54
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use quote::quote;
1212
use runtime_support::FreeVariable;
1313
use stageleft::*;
1414

15+
use crate::cycle::CycleCollection;
1516
use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource};
1617
use crate::location::{Cluster, Location, LocationId, Process};
1718
use crate::stream::{Bounded, NoTick, Tick, Unbounded};
@@ -277,50 +278,7 @@ impl<'a> FlowBuilder<'a> {
277278
)
278279
}
279280

280-
#[allow(clippy::type_complexity)]
281-
pub fn cycle<T, W, L: Location>(
282-
&self,
283-
on: &L,
284-
) -> (HfCycle<'a, T, W, NoTick, L>, Stream<'a, T, W, NoTick, L>) {
285-
let next_id = {
286-
let on_id = match on.id() {
287-
LocationId::Process(id) => id,
288-
LocationId::Cluster(id) => id,
289-
};
290-
291-
let mut cycle_ids = self.cycle_ids.borrow_mut();
292-
let next_id_entry = cycle_ids.entry(on_id).or_default();
293-
294-
let id = *next_id_entry;
295-
*next_id_entry += 1;
296-
id
297-
};
298-
299-
let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
300-
301-
(
302-
HfCycle {
303-
ident: ident.clone(),
304-
location_kind: on.id(),
305-
ir_leaves: self.ir_leaves().clone(),
306-
_phantom: PhantomData,
307-
},
308-
Stream::new(
309-
on.id(),
310-
self.ir_leaves().clone(),
311-
HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource {
312-
ident,
313-
location_kind: on.id(),
314-
})),
315-
),
316-
)
317-
}
318-
319-
#[allow(clippy::type_complexity)]
320-
pub fn tick_cycle<T, W, L: Location>(
321-
&self,
322-
on: &L,
323-
) -> (HfCycle<'a, T, W, Tick, L>, Stream<'a, T, W, Tick, L>) {
281+
pub fn cycle<S: CycleCollection<'a>>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) {
324282
let next_id = {
325283
let on_id = match on.id() {
326284
LocationId::Process(id) => id,
@@ -340,18 +298,9 @@ impl<'a> FlowBuilder<'a> {
340298
(
341299
HfCycle {
342300
ident: ident.clone(),
343-
location_kind: on.id(),
344-
ir_leaves: self.ir_leaves().clone(),
345301
_phantom: PhantomData,
346302
},
347-
Stream::new(
348-
on.id(),
349-
self.ir_leaves().clone(),
350-
HfPlusNode::CycleSource {
351-
ident,
352-
location_kind: on.id(),
353-
},
354-
),
303+
S::create_source(ident, self.ir_leaves.clone(), on.id()),
355304
)
356305
}
357306
}

hydroflow_plus/src/cycle.rs

+14-30
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,28 @@
11
use std::marker::PhantomData;
22

33
use crate::builder::FlowLeaves;
4-
use crate::ir::{HfPlusLeaf, HfPlusNode};
54
use crate::location::{Location, LocationId};
6-
use crate::stream::{NoTick, Tick};
7-
use crate::Stream;
5+
6+
pub trait CycleCollection<'a> {
7+
type Location: Location;
8+
9+
fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self;
10+
11+
fn complete(self, ident: syn::Ident);
12+
}
813

914
/// Represents a fixpoint cycle in the graph that will be fulfilled
1015
/// by a stream that is not yet known.
1116
///
12-
/// See [`Stream`] for an explainer on the type parameters.
13-
pub struct HfCycle<'a, T, W, C, N: Location> {
17+
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
18+
pub struct HfCycle<'a, S: CycleCollection<'a>> {
1419
pub(crate) ident: syn::Ident,
15-
pub(crate) location_kind: LocationId,
16-
pub(crate) ir_leaves: FlowLeaves<'a>,
17-
pub(crate) _phantom: PhantomData<(N, &'a mut &'a (), T, W, C)>,
18-
}
19-
20-
impl<'a, T, W, N: Location> HfCycle<'a, T, W, Tick, N> {
21-
pub fn complete(self, stream: Stream<'a, T, W, Tick, N>) {
22-
let ident = self.ident;
23-
24-
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
25-
ident,
26-
location_kind: self.location_kind,
27-
input: Box::new(stream.ir_node.into_inner()),
28-
});
29-
}
20+
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
3021
}
3122

32-
impl<'a, T, W, N: Location> HfCycle<'a, T, W, NoTick, N> {
33-
pub fn complete(self, stream: Stream<'a, T, W, NoTick, N>) {
23+
impl<'a, S: CycleCollection<'a>> HfCycle<'a, S> {
24+
pub fn complete(self, stream: S) {
3425
let ident = self.ident;
35-
36-
self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a cycle to a flow that has already been finalized. No cycles can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink {
37-
ident,
38-
location_kind: self.location_kind,
39-
input: Box::new(HfPlusNode::Unpersist(
40-
Box::new(stream.ir_node.into_inner())
41-
)),
42-
});
26+
S::complete(stream, ident)
4327
}
4428
}

hydroflow_plus/src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ pub mod runtime_support {
1919
}
2020

2121
pub mod stream;
22-
pub use stream::Stream;
22+
pub use stream::{Bounded, NoTick, Stream, Tick, Unbounded};
23+
24+
pub mod singleton;
25+
pub use singleton::{Optional, Singleton};
2326

2427
pub mod location;
2528
pub use location::{Cluster, Process};

0 commit comments

Comments
 (0)