Skip to content

Commit 41e5bb9

Browse files
authored
refactor(hydro_lang)!: rename _interleaved to _anonymous (#1695)
Also address docs feedback for streams.
1 parent 2fc071e commit 41e5bb9

File tree

11 files changed

+36
-30
lines changed

11 files changed

+36
-30
lines changed

docs/docs/hydro/live-collections/streams.md

+22-16
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ sidebar_position: 2
33
---
44

55
# Streams
6-
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections, a feed of API requests, or even time-based intervals. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).
6+
Streams are the most common type of live collection in Hydro; they can be used to model streaming data collections or a feed of API requests. A `Stream` represents a sequence of elements, with new elements being asynchronously appended to the end of the sequence. Streams can be transformed using APIs like `map` and `filter`, based on Rust [iterators](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html). You can view the full API documentation for Streams [here](pathname:///rustdoc/hydro_lang/stream/struct.Stream).
77

88
Streams have several type parameters:
99
- `T`: the type of elements in the stream
@@ -13,14 +13,14 @@ Streams have several type parameters:
1313
- This type parameter is _optional_; by default the order is deterministic
1414

1515
## Creating a Stream
16-
The simplest way to create a stream is to use the [`source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter) method on a location, which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:
16+
The simplest way to create a stream is to use [`Location::source_iter`](https://hydro.run/rustdoc/hydro_lang/location/trait.Location#method.source_iter), which creates a stream from any Rust type that can be converted into an [`Iterator`](https://doc.rust-lang.org/beta/std/iter/trait.Iterator.html) (via [`IntoIterator`](https://doc.rust-lang.org/std/iter/trait.IntoIterator.html)). For example, we can create a stream of integers on a [process](../locations/processes.md) and transform it:
1717

1818
```rust
1919
# use hydro_lang::*;
2020
# use dfir_rs::futures::StreamExt;
2121
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
2222
let process = flow.process::<()>();
23-
let numbers: Stream<_, Process<_>, _> = process
23+
let numbers: Stream<_, Process<_>, Unbounded> = process
2424
.source_iter(q!(vec![1, 2, 3]))
2525
.map(q!(|x| x + 1));
2626
// 2, 3, 4
@@ -32,16 +32,16 @@ let numbers: Stream<_, Process<_>, _> = process
3232
# }));
3333
```
3434

35-
Streams also can be sent over the network to form distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:
35+
Streams also can be sent over the network to participate in distributed programs. Under the hood, sending a stream sets up an RPC handler at the target location that will receive the stream elements. For example, we can send a stream of integers from one process to another with [bincode](https://docs.rs/bincode/latest/bincode/) serialization:
3636

3737
```rust
3838
# use hydro_lang::*;
3939
# use dfir_rs::futures::StreamExt;
4040
# tokio_test::block_on(test_util::multi_location_test(|flow, p_out| {
4141
let p1 = flow.process::<()>();
42-
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![1, 2, 3]));
42+
let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
4343
let p2 = flow.process::<()>();
44-
let on_p2: Stream<_, Process<_>, _> = numbers.send_bincode(&p2);
44+
let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
4545
// 1, 2, 3
4646
# on_p2.send_bincode(&p_out)
4747
# }, |mut stream| async move {
@@ -62,41 +62,41 @@ If we send a stream from a cluster to a process, the return type will be a strea
6262
# use hydro_lang::*;
6363
# let flow = FlowBuilder::new();
6464
let workers: Cluster<()> = flow.cluster::<()>();
65-
let numbers: Stream<_, Cluster<_>, _, TotalOrder> =
65+
let numbers: Stream<_, Cluster<_>, Unbounded, TotalOrder> =
6666
workers.source_iter(q!(vec![1, 2, 3]));
6767
let process: Process<()> = flow.process::<()>();
68-
let on_p2: Stream<_, Process<_>, _, NoOrder> =
68+
let on_p2: Stream<_, Process<_>, Unbounded, NoOrder> =
6969
numbers.send_bincode(&process);
7070
```
7171

7272
The ordering of a stream determines which APIs are available on it. For example, `map` and `filter` are available on all streams, but `last` is only available on streams with `TotalOrder`. This ensures that even when the network introduces non-determinism, the program will not compile if it tries to use an API that requires a deterministic order.
7373

74-
A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the aggregation may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams. The error is a bit misleading, but the key part is that `fold` is not available on `NoOrder` streams:
74+
A particularly common API that faces this restriction is [`fold`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold) (and [`reduce`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.reduce)). These APIs require the stream to have a deterministic order, since the result may depend on the order of elements. For example, the following code will not compile because `fold` is not available on `NoOrder` streams (note that the error is a bit misleading due to the Rust compiler attempting to apply `Iterator` methods):
7575

7676
```compile_fail
7777
# use hydro_lang::*;
7878
# let flow = FlowBuilder::new();
7979
let workers: Cluster<()> = flow.cluster::<()>();
8080
let process: Process<()> = flow.process::<()>();
81-
let all_words: Stream<_, Process<_>, _, NoOrder> = workers
81+
let all_words: Stream<_, Process<_>, Unbounded, NoOrder> = workers
8282
.source_iter(q!(vec!["hello", "world"]))
8383
.map(q!(|x| x.to_string()))
84-
.send_bincode_interleaved(&process);
84+
.send_bincode_anonymous(&process);
8585
8686
let words_concat = all_words
8787
.fold(q!(|| "".to_string()), q!(|acc, x| acc += x));
88-
// ^^^^ `hydro_lang::Stream<&str, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
88+
// ^^^^ error: `hydro_lang::Stream<String, hydro_lang::Process<'_>, hydro_lang::Unbounded, NoOrder>` is not an iterator
8989
```
9090

9191
:::tip
9292

93-
We use `send_bincode_interleaved` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
93+
We use `send_bincode_anonymous` here to drop the cluster IDs which are included in `send_bincode`. See [Clusters](../locations/clusters.md) for more details.
9494

95-
You'll notice that we aggregated an **asynchronously** updated stream, so the result is a `Singleton` live collection. For more details on the semantics of singletons, including how they are updated when new inputs arrive, see [Singletons and Optionals](./singletons-optionals.md).
95+
Running an aggregation (`fold`, `reduce`) converts a `Stream` into a `Singleton`, as we see in the type signature here. The `Singleton` type is still "live" in the sense of a [Live Collection](./index.md), so updates to the `Stream` input cause updates to the `Singleton` output. See [Singletons and Optionals](./singletons-optionals.md) for more information.
9696

9797
:::
9898

99-
To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the aggregation function to be commutative (and therefore immune to non-deterministic ordering):
99+
To perform an aggregation with an unordered stream, you must use [`fold_commutative`](pathname:///rustdoc/hydro_lang/stream/struct.Stream#method.fold_commutative), which requires the provided closure to be commutative (and therefore immune to non-deterministic ordering):
100100

101101
```rust,no_run
102102
# use hydro_lang::*;
@@ -107,11 +107,17 @@ To perform an aggregation with an unordered stream, you must use [`fold_commutat
107107
# let all_words: Stream<_, Process<_>, _, NoOrder> = workers
108108
# .source_iter(q!(vec!["hello", "world"]))
109109
# .map(q!(|x| x.to_string()))
110-
# .send_bincode_interleaved(&process);
110+
# .send_bincode_anonymous(&process);
111111
let words_count = all_words
112112
.fold_commutative(q!(|| 0), q!(|acc, x| *acc += 1));
113113
```
114114

115+
:::danger
116+
117+
Developers are responsible for the commutativity of the closure they pass into `*_commutative` methods. In the future, commutativity checks will be automatically provided by the compiler (via tools like [Kani](https://github.com/model-checking/kani)).
118+
119+
:::
120+
115121
## Bounded and Unbounded Streams
116122

117123
:::caution

docs/docs/hydro/locations/clusters.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ numbers.send_bincode(&process)
4949

5050
:::tip
5151

52-
If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver:
52+
If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_anonymous` method instead, which will drop the IDs at the receiver:
5353

5454
```rust
5555
# use hydro_lang::*;
5656
# use dfir_rs::futures::StreamExt;
5757
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
5858
# let workers: Cluster<()> = flow.cluster::<()>();
5959
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
60-
numbers.send_bincode_interleaved(&process)
60+
numbers.send_bincode_anonymous(&process)
6161
# }, |mut stream| async move {
6262
// if there are 4 members in the cluster, we should receive 4 elements
6363
// 1, 1, 1, 1
@@ -166,7 +166,7 @@ let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
166166
self_id_stream
167167
.filter(q!(|x| x.raw_id % 2 == 0))
168168
.map(q!(|x| format!("hello from {}", x.raw_id)))
169-
.send_bincode_interleaved(&process)
169+
.send_bincode_anonymous(&process)
170170
// if there are 4 members in the cluster, we should receive 2 elements
171171
// "hello from 0", "hello from 2"
172172
# }, |mut stream| async move {

docs/docs/hydro/quickstart/clusters.mdx

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ On each cluster member, we will then do some work to transform the data (using `
3232

3333
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 10, 11)}</CodeBlock>
3434

35-
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_interleaved`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_interleaved` is available as a helper.
35+
Finally, we will send the data back to the leader. We achieve this using a variant of the APIs from before: `send_bincode_anonymous`. If we used `send_bincode`, we would get a stream of `(cluster ID, data)` tuples. Since it is a common pattern to ignore the IDs, `send_bincode_anonymous` is available as a helper.
3636

3737
<CodeBlock language="rust" title="src/first_ten_cluster.rs">{getLines(firstTenClusterSrc, 12, 14)}</CodeBlock>
3838

hydro_lang/src/stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1926,7 +1926,7 @@ impl<'a, T, L: Location<'a> + NoTick, B, Order> Stream<T, L, B, Order> {
19261926
}
19271927
}
19281928

1929-
pub fn send_bincode_interleaved<L2: Location<'a>, Tag, CoreType>(
1929+
pub fn send_bincode_anonymous<L2: Location<'a>, Tag, CoreType>(
19301930
self,
19311931
other: &L2,
19321932
) -> Stream<CoreType, L2, Unbounded, Order::Min>

hydro_std/src/compartmentalize.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl<'a, T, C1, C2, Order> PartitionStream<'a, T, C1, C2, Order>
4040
Min = NoOrder,
4141
>,
4242
{
43-
self.map(dist_policy).send_bincode_interleaved(other)
43+
self.map(dist_policy).send_bincode_anonymous(other)
4444
}
4545
}
4646

@@ -78,7 +78,7 @@ impl<'a, T, C1, B, Order> DecoupleClusterStream<'a, T, C1, B, Order>
7878
ClusterId::from_raw(CLUSTER_SELF_ID.raw_id),
7979
b.clone()
8080
)))
81-
.send_bincode_interleaved(other);
81+
.send_bincode_anonymous(other);
8282

8383
unsafe {
8484
// SAFETY: this is safe because we are mapping clusters 1:1

hydro_test/src/cluster/compute_pi.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub fn compute_pi<'a>(
3030
.all_ticks();
3131

3232
let estimate = trials
33-
.send_bincode_interleaved(&process)
33+
.send_bincode_anonymous(&process)
3434
.reduce_commutative(q!(|(inside, total), (inside_batch, total_batch)| {
3535
*inside += inside_batch;
3636
*total += total_batch;

hydro_test/src/cluster/map_reduce.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub fn map_reduce<'a>(flow: &FlowBuilder<'a>) -> (Process<'a, Leader>, Cluster<'
2525
string, count
2626
)))
2727
.all_ticks()
28-
.send_bincode_interleaved(&process);
28+
.send_bincode_anonymous(&process);
2929

3030
unsafe {
3131
// SAFETY: addition is associative so we can batch reduce

hydro_test/src/cluster/paxos.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
417417
)
418418
)))
419419
.all_ticks()
420-
.send_bincode_interleaved(proposers),
420+
.send_bincode_anonymous(proposers),
421421
)
422422
}
423423

@@ -826,6 +826,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
826826
)
827827
)))
828828
.all_ticks()
829-
.send_bincode_interleaved(proposers);
829+
.send_bincode_anonymous(proposers);
830830
(a_log, a_to_proposers_p2b)
831831
}

hydro_test/src/cluster/paxos_bench.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub fn paxos_bench<'a>(
6363
payload.value.0,
6464
((payload.key, payload.value.1), Ok(()))
6565
)))
66-
.send_bincode_interleaved(&clients);
66+
.send_bincode_anonymous(&clients);
6767

6868
// we only mark a transaction as committed when all replicas have applied it
6969
collect_quorum::<_, _, _, ()>(

hydro_test/src/cluster/paxos_with_client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub unsafe fn paxos_with_client<'a, C: 'a, R, P: PaxosPayload>(
5959
all_payloads.cross_singleton(latest_leader).all_ticks()
6060
}
6161
.map(q!(move |(payload, leader_id)| (leader_id, payload)))
62-
.send_bincode_interleaved(proposers);
62+
.send_bincode_anonymous(proposers);
6363

6464
let payloads_at_proposer = {
6565
// SAFETY: documented non-determinism in interleaving of client payloads

template/hydro/src/first_ten_cluster.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub fn first_ten_cluster<'a>(leader: &Process<'a, Leader>, workers: &Cluster<'a,
99
.round_robin_bincode(workers) // : Stream<i32, Cluster<Worker>, ...>
1010
.map(q!(|n| n * 2)) // : Stream<i32, Cluster<Worker>, ...>
1111
.inspect(q!(|n| println!("{}", n))) // : Stream<i32, Cluster<Worker>, ...>
12-
.send_bincode_interleaved(leader) // : Stream<i32, Process<Leader>, ...>
12+
.send_bincode_anonymous(leader) // : Stream<i32, Process<Leader>, ...>
1313
.for_each(q!(|n| println!("{}", n)));
1414
}
1515

0 commit comments

Comments
 (0)