Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(hydro_lang): add initial docs for clusters #1677

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/hydro/live-collections/determinism.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sidebar_position: 1
---

# Eventual Determinism
Most programs are strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are live collections, we can focus on the _eventual_ state of the collection (as if we froze the input and waited until the output stops changing).
Most programs have strong guarantees on **determinism**, the property that when provided the same inputs, the outputs of the program are always the same. Even when the inputs and outputs are live collections, we can focus on the _eventual_ state of the collection (as if we froze the input and waited until the output stops changing).

:::info

Expand Down
188 changes: 186 additions & 2 deletions docs/docs/hydro/locations/clusters.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,192 @@ sidebar_position: 1
---

# Clusters
:::caution
When building scalable distributed systems in Hydro, you'll often need to use **clusters**, which represent groups of threads all running the _same_ piece of your program (SPMD, Single-Program-Multiple-Data). They can be used to implement scale-out systems using techniques such as sharding or replication. Unlike processes, the number of threads in a cluster does not need to be static, and can be chosen during deployment.

The Hydro documentation is currently under active development! This page is a placeholder for future content.
Like when creating a process, you can pass in a type parameter to a cluster to distinguish it from other clusters. For example, you can create a cluster with a marker of `Worker` to represent a pool of workers in a distributed system:

```rust,no_run
# use hydro_lang::*;
struct Worker {}

let flow = FlowBuilder::new();
let workers: Cluster<Worker> = flow.cluster::<Worker>();
```

We can then instantiate a live collection on the cluster using the same APIs as for processes. For example, we can create a stream of integers on the worker cluster. If we launch this program, **each** member of the cluster will create a stream containing the elements 1, 2, 3, and 4:

```rust,no_run
# use hydro_lang::*;
# struct Worker {}
# let flow = FlowBuilder::new();
# let workers: Cluster<Worker> = flow.cluster::<Worker>();
let numbers = workers.source_iter(q!(vec![1, 2, 3, 4]));
```

## Networking
When sending a live collection from a cluster to another location, **each** member of the cluster will send its local collection. On the receiver side, these collections will be joined together into a single stream of `(ID, Data)` tuples where the ID uniquely identifies which member of the cluster the data came from. For example, we can send a stream from the worker cluster to another process using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 1), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 1), (ClusterId::<Worker>(3), 1)
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 1)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 1)", "(ClusterId::<()>(3), 1)"]);
# }));
```

:::tip

If you do not need to know _which_ member of the cluster the data came from, you can use the `send_bincode_interleaved` method instead, which will drop the IDs at the receiver:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
numbers.send_bincode_interleaved(&process)
# }, |mut stream| async move {
// if there are 4 members in the cluster, we should receive 4 elements
// 1, 1, 1, 1
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["1", "1", "1", "1"]);
# }));
```

:::

In the reverse direction, when sending a stream _to_ a cluster, the sender must prepare `(ID, Data)` tuples, where the ID uniquely identifies which member of the cluster the data is intended for. For example, we can send a stream from a process to the worker cluster using the `send_bincode` method:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
let on_worker: Stream<_, Cluster<_>, _> = numbers
.map(q!(|x| (ClusterId::from_raw(x), x)))
.send_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 0), (ClusterId::<Worker>(1), 1), (ClusterId::<Worker>(2), 2), (ClusterId::<Worker>(3), 3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 0)", "(ClusterId::<()>(1), 1)", "(ClusterId::<()>(2), 2)", "(ClusterId::<()>(3), 3)"]);
# }));
```

## Broadcasting and Membership Lists
A common pattern in distributed systems is to broadcast data to all members of a cluster. In Hydro, this can be achieved using `broadcast_bincode`, which takes in a stream of **only data elements** and broadcasts them to all members of the cluster. For example, we can broadcast a stream of integers to the worker cluster:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers);
on_worker.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// (ClusterId::<Worker>(0), 123), (ClusterId::<Worker>(1), 123), (ClusterId::<Worker>(2), 123), (ClusterId::<Worker>(3), 123)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["(ClusterId::<()>(0), 123)", "(ClusterId::<()>(1), 123)", "(ClusterId::<()>(2), 123)", "(ClusterId::<()>(3), 123)"]);
# }));
```

:::warning

The current broadcast implementation assumes a static configuration where members cannot be added or removed at runtime. This will change in the future as Hydro will support dynamically scaled clusters.

:::

Under the hood, the `broadcast_bincode` API uses a list of members of the cluster provided by the deployment system. To manually access this list, you can use the `members` method on a cluster to get a value that can be used inside `q!(...)` blocks:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, p2| {
# let p1 = flow.process::<()>();
# let workers: Cluster<()> = flow.cluster::<()>();
# // do nothing on each worker
# workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
let cluster_members = workers.members();
let members_stream: Stream<ClusterId<_>, Process<_>, _> = p1
.source_iter(q!(cluster_members /* : &[ClusterId<Worker>] */))
.cloned();
members_stream.send_bincode(&p2)
// if there are 4 members in the cluster, we should receive 4 elements
// ClusterId::<Worker>(0), ClusterId::<Worker>(1), ClusterId::<Worker>(2), ClusterId::<Worker>(3)
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..4 {
# results.push(format!("{:?}", stream.next().await.unwrap()));
# }
# results.sort();
# assert_eq!(results, vec!["ClusterId::<()>(0)", "ClusterId::<()>(1)", "ClusterId::<()>(2)", "ClusterId::<()>(3)"]);
# }));
```

## Self-Identification
In some programs, it may be necessary for cluster members to know their own ID (for example, to construct a ballot in Paxos). In Hydro, this can be achieved by using the `CLUSTER_SELF_ID` constant, which can be used inside `q!(...)` blocks to get the current cluster member's ID:

```rust
# use hydro_lang::*;
# use dfir_rs::futures::StreamExt;
# tokio_test::block_on(test_util::multi_location_test(|flow, process| {
let workers: Cluster<()> = flow.cluster::<()>();
let self_id_stream = workers.source_iter(q!([CLUSTER_SELF_ID]));
self_id_stream
.filter(q!(|x| x.raw_id % 2 == 0))
.map(q!(|x| format!("hello from {}", x.raw_id)))
.send_bincode_interleaved(&process)
// if there are 4 members in the cluster, we should receive 2 elements
// "hello from 0", "hello from 2"
# }, |mut stream| async move {
# let mut results = Vec::new();
# for w in 0..2 {
# results.push(stream.next().await.unwrap());
# }
# results.sort();
# assert_eq!(results, vec!["hello from 0", "hello from 2"]);
# }));
```

:::info

You can only use `CLUSTER_SELF_ID` in code that will run on a `Cluster<_>`, such as when calling `Stream::map` when that stream is on a cluster. If you try to use it in code that will run on a `Process<_>`, you'll get a compile-time error:

```compile_fail
# use hydro_lang::*;
# let flow = FlowBuilder::new();
let process: Process<()> = flow.process::<()>();
process.source_iter(q!([CLUSTER_SELF_ID]));
// error[E0277]: the trait bound `ClusterSelfId<'_>: FreeVariableWithContext<hydro_lang::Process<'_>>` is not satisfied
```

:::
25 changes: 17 additions & 8 deletions docs/src/theme/prism-include-languages.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import siteConfig from '@generated/docusaurus.config';
import siteConfig from "@generated/docusaurus.config";
export default function prismIncludeLanguages(PrismObject) {
const {
themeConfig: {prism},
themeConfig: { prism },
} = siteConfig;
const {additionalLanguages} = prism;
const { additionalLanguages } = prism;

const PrismBefore = globalThis.Prism;
globalThis.Prism = PrismObject;
additionalLanguages.forEach((lang) => {
Expand All @@ -13,17 +13,26 @@ export default function prismIncludeLanguages(PrismObject) {
});
Prism.languages["rust,ignore"] = Prism.languages.rust;
Prism.languages["rust,no_run"] = Prism.languages.rust;
Prism.languages["compile_fail"] = Prism.languages.rust;

const origTokenize = PrismObject.tokenize;
PrismObject.hooks.add("after-tokenize", function(env) {
if (env.language === "rust" || env.language === "rust,ignore" || env.language === "rust,no_run") {
let code = env.code.split("\n").filter(line => !line.startsWith("# ")).join("\n");
PrismObject.hooks.add("after-tokenize", function (env) {
if (
env.language === "rust" ||
env.language === "rust,ignore" ||
env.language === "rust,no_run" ||
env.language === "compile_fail"
) {
let code = env.code
.split("\n")
.filter((line) => !line.startsWith("# "))
.join("\n");
env.tokens = origTokenize(code, env.grammar);
}
});

delete globalThis.Prism;
if (typeof PrismBefore !== 'undefined') {
if (typeof PrismBefore !== "undefined") {
globalThis.Prism = PrismObject;
}
}
Loading