Skip to content

Commit

Permalink
refactor(hydroflow_plus)!: fold Tick vs NoTick into the location …
Browse files Browse the repository at this point in the history
…type parameter (#1519)

Now, when the location is a top-level `Process` or `Cluster` that
corresponds to a `NoTick`, and for streams inside a tick we wrap the
location type (e.g. `Tick<Process<...>>`). This simplifies type
signatures for a lot of our example code.
  • Loading branch information
shadaj authored Nov 4, 2024
1 parent 0a5abab commit 5657563
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 336 deletions.
7 changes: 4 additions & 3 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::marker::PhantomData;

use crate::builder::FlowState;
use crate::location::{Location, LocationId};
use crate::Tick;

pub struct TickCycle {}

pub trait DeferTick {
fn defer_tick(self) -> Self;
Expand Down Expand Up @@ -45,12 +46,12 @@ impl<'a, T, S: CycleComplete<'a, T>> HfForwardRef<'a, T, S> {
}
}

pub struct HfCycle<'a, S: CycleComplete<'a, Tick> + DeferTick> {
pub struct HfCycle<'a, S: CycleComplete<'a, TickCycle> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) _phantom: PhantomData<(&'a mut &'a (), S)>,
}

impl<'a, S: CycleComplete<'a, Tick> + DeferTick> HfCycle<'a, S> {
impl<'a, S: CycleComplete<'a, TickCycle> + DeferTick> HfCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream.defer_tick(), ident)
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod runtime_support {
}

pub mod stream;
pub use stream::{Bounded, NoTick, Stream, Tick, Unbounded};
pub use stream::{Bounded, Stream, Tick, Unbounded};

pub mod singleton;
pub use singleton::{Optional, Singleton};
Expand Down
77 changes: 45 additions & 32 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use serde::{Deserialize, Serialize};
use stageleft::{q, quote_type, Quoted};

use super::builder::{ClusterIds, ClusterSelfId, FlowState};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, DeferTick, HfCycle};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial, DeferTick, HfCycle, TickCycle};
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, HfForwardRef, NoTick, Optional, Singleton, Stream, Tick, Unbounded};
use crate::stream::NoTick;
use crate::{Bounded, HfForwardRef, Optional, Singleton, Stream, Tick, Unbounded};

#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum LocationId {
Expand All @@ -28,9 +29,9 @@ pub trait Location<'a> {

fn flow_state(&self) -> &FlowState;

fn spin(&self) -> Stream<(), Unbounded, NoTick, Self>
fn spin(&self) -> Stream<(), Unbounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
Stream::new(
self.id(),
Expand All @@ -45,9 +46,9 @@ pub trait Location<'a> {
fn spin_batch(
&self,
batch_size: impl Quoted<'a, usize> + Copy + 'a,
) -> Stream<(), Bounded, Tick, Self>
) -> Stream<(), Bounded, Tick<Self>>
where
Self: Sized,
Self: Sized + NoTick,
{
self.spin()
.flat_map(q!(move |_| 0..batch_size))
Expand All @@ -58,9 +59,9 @@ pub trait Location<'a> {
fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
&self,
e: impl Quoted<'a, E>,
) -> Stream<T, Unbounded, NoTick, Self>
) -> Stream<T, Unbounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
let e = e.splice_untyped();

Expand All @@ -77,9 +78,9 @@ pub trait Location<'a> {
fn source_iter<T, E: IntoIterator<Item = T>>(
&self,
e: impl Quoted<'a, E>,
) -> Stream<T, Bounded, NoTick, Self>
) -> Stream<T, Bounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
let e = e.splice_untyped();

Expand All @@ -93,9 +94,9 @@ pub trait Location<'a> {
)
}

fn singleton<T: Clone>(&self, e: impl Quoted<'a, T>) -> Singleton<T, Bounded, NoTick, Self>
fn singleton<T: Clone>(&self, e: impl Quoted<'a, T>) -> Singleton<T, Bounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
let e_arr = q!([e]);
let e = e_arr.splice_untyped();
Expand All @@ -118,19 +119,19 @@ pub trait Location<'a> {
fn singleton_each_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
) -> Singleton<T, Bounded, Tick, Self>
) -> Singleton<T, Bounded, Tick<Self>>
where
Self: Sized,
Self: Sized + NoTick,
{
self.singleton(e).latest_tick()
}

fn singleton_first_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
) -> Optional<T, Bounded, Tick, Self>
) -> Optional<T, Bounded, Tick<Self>>
where
Self: Sized,
Self: Sized + NoTick,
{
let e_arr = q!([e]);
let e = e_arr.splice_untyped();
Expand All @@ -148,9 +149,9 @@ pub trait Location<'a> {
fn source_interval(
&self,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<tokio::time::Instant, Unbounded, NoTick, Self>
) -> Stream<tokio::time::Instant, Unbounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(interval)
Expand All @@ -161,18 +162,21 @@ pub trait Location<'a> {
&self,
delay: impl Quoted<'a, Duration> + Copy + 'a,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Stream<tokio::time::Instant, Unbounded, NoTick, Self>
) -> Stream<tokio::time::Instant, Unbounded, Self>
where
Self: Sized,
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)))
}

fn forward_ref<S: CycleCollection<'a, NoTick, Location = Self>>(
fn forward_ref<S: CycleCollection<'a, (), Location = Self>>(
&self,
) -> (HfForwardRef<'a, NoTick, S>, S) {
) -> (HfForwardRef<'a, (), S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand All @@ -199,9 +203,12 @@ pub trait Location<'a> {
)
}

fn tick_forward_ref<S: CycleCollection<'a, Tick, Location = Self>>(
fn tick_forward_ref<S: CycleCollection<'a, TickCycle, Location = Self>>(
&self,
) -> (HfForwardRef<'a, Tick, S>, S) {
) -> (HfForwardRef<'a, TickCycle, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand All @@ -228,9 +235,12 @@ pub trait Location<'a> {
)
}

fn tick_cycle<S: CycleCollection<'a, Tick, Location = Self> + DeferTick>(
fn tick_cycle<S: CycleCollection<'a, TickCycle, Location = Self> + DeferTick>(
&self,
) -> (HfCycle<'a, S>, S) {
) -> (HfCycle<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand Down Expand Up @@ -258,11 +268,14 @@ pub trait Location<'a> {
}

fn tick_cycle_with_initial<
S: CycleCollectionWithInitial<'a, Tick, Location = Self> + DeferTick,
S: CycleCollectionWithInitial<'a, TickCycle, Location = Self> + DeferTick,
>(
&self,
initial: S,
) -> (HfCycle<'a, S>, S) {
) -> (HfCycle<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
Expand Down Expand Up @@ -336,10 +349,10 @@ impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
}

impl<'a, P> ExternalProcess<'a, P> {
pub fn source_external_bytes<L: Location<'a>>(
pub fn source_external_bytes<L: Location<'a> + NoTick>(
&self,
to: &L,
) -> (ExternalBytesPort, Stream<Bytes, Unbounded, NoTick, L>) {
) -> (ExternalBytesPort, Stream<Bytes, Unbounded, L>) {
let next_external_port_id = {
let mut flow_state = self.flow_state.borrow_mut();
let id = flow_state.next_external_out;
Expand Down Expand Up @@ -372,10 +385,10 @@ impl<'a, P> ExternalProcess<'a, P> {
)
}

pub fn source_external_bincode<L: Location<'a>, T: Serialize + DeserializeOwned>(
pub fn source_external_bincode<L: Location<'a> + NoTick, T: Serialize + DeserializeOwned>(
&self,
to: &L,
) -> (ExternalBincodeSink<T>, Stream<T, Unbounded, NoTick, L>) {
) -> (ExternalBincodeSink<T>, Stream<T, Unbounded, L>) {
let next_external_port_id = {
let mut flow_state = self.flow_state.borrow_mut();
let id = flow_state.next_external_out;
Expand Down
Loading

0 comments on commit 5657563

Please sign in to comment.