Skip to content

Commit

Permalink
feat(hydroflow_plus)!: mark non-deterministic operators as unsafe and…
Browse files Browse the repository at this point in the history
… introduce timestamped streams (#1584)

Big PR.

First big change is we introduce a `Timestamped` location. This is a bit
of a hybrid between top-level locations and `Tick` locations. The idea
is that you choose where timestamps are generated, and then have a
guarantee that everything after that will be atomically computed (useful
for making sure we add payloads to the log before ack-ing).

The contract is that an operator or module that takes a `Timestamped`
input must still be deterministic regardless of the stamps on messages
(which are hidden unless you `tick_batch`). But unlike a top-level
stream (which has the same constraints), you have the atomicity
guarantee. Right now the guarantee is trivial since we have one global
tick for everything. But in the future when we want to apply
@davidchuyaya's optimizations this will be helpful to know when there
are causal dependencies on when data can be sent to others.

Second change is we mark every non-deterministic operator (modulo
explicit annotations such as `NoOrder`) with Rust's `unsafe` keyword.
This makes it super clear where non-determinism is taking place.

I've used this to put `unsafe` blocks throughout our example code and
add `SAFETY` annotations that argue why the non-determinism is safe (or
point out that we've explicitly documented / expect non-determinism). I
also added `#![warn(unsafe_op_in_unsafe_fn)]` to the examples and the
template, since this forces good hygiene of annotating sources of
non-determinism even inside a module that is intentionally
non-deterministic.

Paxos changes are mostly refactors, and I verified that the performance
is the same as before.
  • Loading branch information
shadaj authored Nov 27, 2024
1 parent ec55910 commit 9393899
Show file tree
Hide file tree
Showing 30 changed files with 1,481 additions and 644 deletions.
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
}
}
],
"editor.semanticTokenColorCustomizations": {
"enabled": true,
"rules": {
"*.unsafe:rust": {
"foreground": "#ea1708",
"fontStyle": "bold"
}
}
},
"files.watcherExclude": {
"**/target": true
},
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ opt-level = "s"

[workspace.lints.rust]
unused_qualifications = "warn"
unsafe_op_in_unsafe_fn = "warn"

[workspace.lints.clippy]
allow_attributes = "warn"
Expand Down
1 change: 1 addition & 0 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![expect(
unused_qualifications,
non_local_definitions,
unsafe_op_in_unsafe_fn,
reason = "for pyo3 generated code"
)]

Expand Down
10 changes: 6 additions & 4 deletions hydroflow_plus/src/cycle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::location::Location;
use crate::location::{Location, LocationId};
use crate::staging_util::Invariant;

pub enum ForwardRefMarker {}
Expand All @@ -9,7 +9,7 @@ pub trait DeferTick {
}

pub trait CycleComplete<'a, T> {
fn complete(self, ident: syn::Ident);
fn complete(self, ident: syn::Ident, expected_location: LocationId);
}

pub trait CycleCollection<'a, T>: CycleComplete<'a, T> {
Expand All @@ -30,24 +30,26 @@ pub trait CycleCollectionWithInitial<'a, T>: CycleComplete<'a, T> {
/// See [`crate::FlowBuilder`] for an explainer on the type parameters.
pub struct ForwardRef<'a, S: CycleComplete<'a, ForwardRefMarker>> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, ForwardRefMarker>> ForwardRef<'a, S> {
pub fn complete(self, stream: S) {
let ident = self.ident;
S::complete(stream, ident)
S::complete(stream, ident, self.expected_location)
}
}

pub struct TickCycle<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> {
pub(crate) ident: syn::Ident,
pub(crate) expected_location: LocationId,
pub(crate) _phantom: Invariant<'a, S>,
}

impl<'a, S: CycleComplete<'a, TickCycleMarker> + DeferTick> TickCycle<'a, S> {
pub fn complete_next_tick(self, stream: S) {
let ident = self.ident;
S::complete(stream.defer_tick(), ident)
S::complete(stream.defer_tick(), ident, self.expected_location)
}
}
12 changes: 7 additions & 5 deletions hydroflow_plus/src/location/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@ pub struct Cluster<'a, C> {

pub trait IsCluster {
type Tag;
fn id(&self) -> usize;
}

impl<C> IsCluster for Cluster<'_, C> {
type Tag = C;
fn id(&self) -> usize {
self.id
}
}

impl<'a, C> Cluster<'a, C> {
Expand Down Expand Up @@ -125,8 +121,14 @@ where
where
Self: Sized,
{
let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
id
} else {
unreachable!()
};

let ident = syn::Ident::new(
&format!("__hydroflow_plus_cluster_self_id_{}", ctx.root().id()),
&format!("__hydroflow_plus_cluster_self_id_{}", cluster_id),
Span::call_site(),
);
let root = get_this_crate();
Expand Down
26 changes: 23 additions & 3 deletions hydroflow_plus/src/location/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
}

pub trait Location<'a>: Clone {
type Root;
type Root: Location<'a>;

fn root(&self) -> Self::Root;

Expand Down Expand Up @@ -160,7 +160,16 @@ pub trait Location<'a>: Clone {
)
}

fn source_interval(
/// Generates a stream with values emitted at a fixed interval, with
/// each value being the current time (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
Expand All @@ -172,7 +181,17 @@ pub trait Location<'a>: Clone {
)))
}

fn source_interval_delayed(
/// Generates a stream with values emitted at a fixed interval (with an
/// initial delay), with each value being the current time
/// (as an [`tokio::time::Instant`]).
///
/// The clock source used is monotonic, so elements will be emitted in
/// increasing order.
///
/// # Safety
/// Because this stream is generated by an OS timer, it will be
/// non-deterministic because each timestamp will be arbitrary.
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
Expand Down Expand Up @@ -212,6 +231,7 @@ pub trait Location<'a>: Clone {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down
97 changes: 92 additions & 5 deletions hydroflow_plus/src/location/tick.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

use proc_macro2::Span;
use sealed::sealed;
use stageleft::{q, QuotedWithContext};

use super::{Cluster, Location, LocationId, Process};
Expand All @@ -12,10 +13,50 @@ use crate::cycle::{
use crate::ir::{HfPlusNode, HfPlusSource};
use crate::{Bounded, Optional, Singleton, Stream};

#[sealed]
pub trait NoTick {}
#[sealed]
impl<T> NoTick for Process<'_, T> {}
#[sealed]
impl<T> NoTick for Cluster<'_, T> {}

#[sealed]
pub trait NoTimestamp {}
#[sealed]
impl<T> NoTimestamp for Process<'_, T> {}
#[sealed]
impl<T> NoTimestamp for Cluster<'_, T> {}
#[sealed]
impl<'a, L: Location<'a>> NoTimestamp for Tick<L> {}

#[derive(Clone)]
pub struct Timestamped<L> {
pub(crate) tick: Tick<L>,
}

impl<'a, L: Location<'a>> Location<'a> for Timestamped<L> {
type Root = L::Root;

fn root(&self) -> Self::Root {
self.tick.root()
}

fn id(&self) -> LocationId {
self.tick.id()
}

fn flow_state(&self) -> &FlowState {
self.tick.flow_state()
}

fn is_top_level() -> bool {
L::is_top_level()
}
}

#[sealed]
impl<L> NoTick for Timestamped<L> {}

/// Marks the stream as being inside the single global clock domain.
#[derive(Clone)]
pub struct Tick<L> {
Expand Down Expand Up @@ -53,13 +94,20 @@ impl<'a, L: Location<'a>> Tick<L> {
batch_size: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
) -> Stream<(), Self, Bounded>
where
L: NoTick,
L: NoTick + NoTimestamp,
{
self.l
let out = self
.l
.spin()
.flat_map(q!(move |_| 0..batch_size))
.flat_map_ordered(q!(move |_| 0..batch_size))
.map(q!(|_| ()))
.tick_batch(self)
.timestamped(self);

unsafe {
// SAFETY: at runtime, `spin` produces a single value per tick,
// so each batch is guaranteed to be the same size.
out.tick_batch()
}
}

pub fn singleton<T: Clone>(
Expand All @@ -69,7 +117,10 @@ impl<'a, L: Location<'a>> Tick<L> {
where
L: NoTick,
{
self.outer().singleton(e).latest_tick(self)
unsafe {
// SAFETY: a top-level singleton produces the same value each tick
self.outer().singleton(e).timestamped(self).latest_tick()
}
}

pub fn singleton_first_tick<T: Clone>(
Expand Down Expand Up @@ -118,12 +169,46 @@ impl<'a, L: Location<'a>> Tick<L> {
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
)
}

pub fn forward_ref_timestamped<
S: CycleCollection<'a, ForwardRefMarker, Location = Timestamped<L>>,
>(
&self,
) -> (ForwardRef<'a, S>, S) {
let next_id = {
let on_id = match self.l.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};

let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();

let id = *next_id_entry;
*next_id_entry += 1;
id
};

let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());

(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, Timestamped { tick: self.clone() }),
)
}

pub fn cycle<S: CycleCollection<'a, TickCycleMarker, Location = Self> + DeferTick>(
&self,
) -> (TickCycle<'a, S>, S)
Expand Down Expand Up @@ -151,6 +236,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
Expand Down Expand Up @@ -187,6 +273,7 @@ impl<'a, L: Location<'a>> Tick<L> {
(
TickCycle {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, initial, self.clone()),
Expand Down
Loading

0 comments on commit 9393899

Please sign in to comment.