Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(hydro_lang)!: rename _interleaved to _anonymous #1695

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions docs/docs/hydro/live-collections/streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sidebar_position: 2
---

# Streams
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections, a feed of API requests, or even time-based intervals. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections or a feed of API requests. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).

Streams have several type parameters:
- `T`: the type of elements in the stream
Expand All @@ -13,14 +13,14 @@ Streams have several type parameters:
- This type parameter is _optional_; by default the order is deterministic

## Creating a Stream
The simplest way to create a stream is to use the [`source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter) method on a location, which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:
The simplest way to create a stream is to use [`Location::source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter), which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
let process = flow.process::<()>();
let numbers: Stream<_, Process<_>, _> = process
let numbers: Stream<_, Process<_>, Unbounded> = process
.source_iter(q!(vec![1, 2, 3]))
.map(q!(|x| x + 1));
// 2, 3, 4
Expand All @@ -32,16 +32,16 @@ let numbers: Stream<_, Process<_>, _> = process
# }));
```

Streams also can be sent over the network to form distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:
Streams also can be sent over the network to participate in distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
let p1 = flow.process::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![1, 2, 3]));
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
let p2 = flow.process::<()>();
let on_p2: Stream<_, Process<_>, _> = numbers.send_bincode(&p2);
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
// 1, 2, 3
# on_p2.send_bincode(&p_out)
# }, |mut stream| async move {
Expand All @@ -62,41 +62,41 @@ If we send a stream from a cluster to a process, the return type will be a strea
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _, TotalOrder> =
let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
workers.source_iter(q!(vec![1, 2, 3]));
let process: Process<()> = flow.process::<()>();
let on_p2: Stream<_, Process<_>, _, NoOrder> =
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
numbers.send_bincode(&process);
```

The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.

A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the aggregation may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams. The error is a bit misleading, but the key part is that `fold` is not available on `NoOrder` streams:
A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the result may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams (note that the error is a bit misleading due to the Rust compiler attempting to apply `Iterator` methods):

```compile_fail
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let workers: Cluster<()> = flow.cluster::<()>();
let process: Process<()> = flow.process::<()>();
let all_words: Stream<_, Process<_>, _, NoOrder> = workers
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
.source_iter(q!(vec!["hello", "world"]))
.map(q!(|x| x.to_string()))
.send_bincode_interleaved(&process);
.send_bincode_anonymous(&process);

let words_concat = all_words
.fold(q!(|| "".to_string()), q!(|acc, x| acc += x));
// ^^^^ `hydro_lang::Stream<&str, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
// ^^^^ error: `hydro_lang::Stream<String, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
```

:::tip

We use `send_bincode_interleaved` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
We use `send_bincode_anonymous` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.

You'll notice that we aggregated an **asynchronously** updated stream, so the result is a `Singleton` live collection. For more details on the semantics of singletons, including how they are updated when new inputs arrive, see [Singletons and Optionals](./singletons-optionals.md).
Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information.

:::

To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the aggregation function to be commutative (and therefore immune to non-deterministic ordering):
To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the provided closure to be commutative (and therefore immune to non-deterministic ordering):

```rust,no_run
# use hydro_lang::*;
Expand All @@ -107,11 +107,17 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat
# let all_words: Stream<_, Process<_>, _, NoOrder> = workers
# .source_iter(q!(vec!["hello", "world"]))
# .map(q!(|x| x.to_string()))
# .send_bincode_interleaved(&process);
# .send_bincode_anonymous(&process);
let words_count = all_words
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));
```

:::danger

Developers are responsible for the commutativity of the closure they pass into `*_commutative` methods. In the future, commutativity checks will be automatically provided by the compiler (via tools like [Kani](https://github.com/model-checking/kani)).

:::

## Bounded and Unbounded Streams

:::caution
Expand Down
6 changes: 3 additions & 3 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ numbers.send_bincode(&process)

:::tip

If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver:
If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_anonymous` method instead, which will drop the IDs at the receiver:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode_interleaved(&process)
numbers.send_bincode_anonymous(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
Expand Down Expand Up @@ -166,7 +166,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode_interleaved(&process)
.send_bincode_anonymous(&process)
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
# }, |mut stream| async move {
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydro/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ On each cluster member, we will then do some work to transform the data (using `

<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 10, 11)}</CodeBlock>

Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_interleaved` is available as a helper.
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_anonymous`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_anonymous` is available as a helper.

<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 12, 14)}</CodeBlock>

Expand Down
2 changes: 1 addition & 1 deletion hydro_lang/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
}
}

pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
self,
other: &L2,
) -> Stream<CoreType, L2, Unbounded, Order::Min>
Expand Down
4 changes: 2 additions & 2 deletions hydro_std/src/compartmentalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
Min = NoOrder,
>,
{
self.map(dist_policy).send_bincode_interleaved(other)
self.map(dist_policy).send_bincode_anonymous(other)
}
}

Expand Down Expand Up @@ -78,7 +78,7 @@ impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
b.clone()
)))
.send_bincode_interleaved(other);
.send_bincode_anonymous(other);

unsafe {
// SAFETY: this is safe because we are mapping clusters 1:1
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/compute_pi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn compute_pi<'a>(
.all_ticks();

let estimate = trials
.send_bincode_interleaved(&process)
.send_bincode_anonymous(&process)
.reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
*inside += inside_batch;
*total += total_batch;
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/map_reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'
string, count
)))
.all_ticks()
.send_bincode_interleaved(&process);
.send_bincode_anonymous(&process);

unsafe {
// SAFETY: addition is associative so we can batch reduce
Expand Down
4 changes: 2 additions & 2 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
)
)))
.all_ticks()
.send_bincode_interleaved(proposers),
.send_bincode_anonymous(proposers),
)
}

Expand Down Expand Up @@ -826,6 +826,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
)
)))
.all_ticks()
.send_bincode_interleaved(proposers);
.send_bincode_anonymous(proposers);
(a_log, a_to_proposers_p2b)
}
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn paxos_bench<'a>(
payload.value.0,
((payload.key, payload.value.1), Ok(()))
)))
.send_bincode_interleaved(&clients);
.send_bincode_anonymous(&clients);

// we only mark a transaction as committed when all replicas have applied it
collect_quorum::<_, _, _, ()>(
Expand Down
2 changes: 1 addition & 1 deletion hydro_test/src/cluster/paxos_with_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
all_payloads.cross_singleton(latest_leader).all_ticks()
}
.map(q!(move |(payload, leader_id)| (leader_id, payload)))
.send_bincode_interleaved(proposers);
.send_bincode_anonymous(proposers);

let payloads_at_proposer = {
// SAFETY: documented non-determinism in interleaving of client payloads
Expand Down
2 changes: 1 addition & 1 deletion template/hydro/src/first_ten_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a,
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>
.send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
.send_bincode_anonymous(leader) // : Stream<i32, Process<Leader>, ...>
.for_each(q!(|n| println!("{}", n)));
}

Expand Down
Loading