diff --git a/.github/workflows/build-cli.yml b/.github/workflows/build-cli.yml
index 285a04b77b2..e4c7485fe93 100644
--- a/.github/workflows/build-cli.yml
+++ b/.github/workflows/build-cli.yml
@@ -105,17 +105,8 @@ jobs:
with:
working-directory: hydro_cli
target: ${{ matrix.platform.target }}
- manylinux: musllinux_1_2
+ manylinux: auto
args: --release --out dist
- - uses: uraimo/run-on-arch-action@master
- name: Install built wheel
- with:
- arch: ${{ matrix.platform.arch }}
- distro: alpine_latest
- install: |
- apk add py3-pip
- run: |
- pip3 install ${{ env.PACKAGE_NAME }} --no-index --find-links hydro_cli/dist/ --force-reinstall
- name: "Upload wheels"
uses: actions/upload-artifact@v3
with:
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a6e5cbfd1e3..fd27b28f5c1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -56,7 +56,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
- args: --all-targets
+ args: --all-targets --features python
check-wasm:
name: Check WebAssembly
@@ -91,7 +91,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
- args: -p hydroflow_lang --target wasm32-unknown-unknown --no-default-features
+ args: -p hydroflow_lang --target wasm32-unknown-unknown
test:
name: Test Suite
@@ -121,11 +121,17 @@ jobs:
toolchain: nightly
override: ${{ matrix.rust_release == 'latest-nightly' }}
- - name: Run cargo test
+ - name: Run cargo test all targets (which does not include doctests)
+ uses: actions-rs/cargo@v1
+ with:
+ command: test
+ args: --no-fail-fast --all-targets --features python
+
+ - name: Run cargo doc tests
uses: actions-rs/cargo@v1
with:
command: test
- args: --no-fail-fast
+ args: --no-fail-fast --doc --features python
test-wasm:
name: Test Suite (WebAssembly)
@@ -275,7 +281,21 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
- args: --all-targets -- -D warnings
+ args: --all-targets --features python -- -D warnings
+
+ build-website:
+ name: Build Website
+ if: ${{ needs.pre_job.outputs.should_skip != 'true' || github.event_name != 'pull_request' }}
+ timeout-minutes: 25
+ needs: pre_job
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout sources
+ uses: actions/checkout@v3
+
+ - name: Build Website
+ run: bash build_docs.bash x86_64-linux-gnu-ubuntu-20.04
docs:
name: Docs (rustdoc)
diff --git a/Cargo.lock b/Cargo.lock
index 9c288400cc9..ac363114db8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -36,9 +36,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
-version = "0.7.20"
+version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
+checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [
"memchr",
]
@@ -1291,7 +1291,7 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hydro_cli"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"anyhow",
"async-channel",
@@ -1340,7 +1340,7 @@ dependencies = [
[[package]]
name = "hydroflow"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"bincode",
"byteorder",
@@ -1359,10 +1359,12 @@ dependencies = [
"hydroflow_lang",
"hydroflow_macro",
"insta",
+ "instant",
"itertools",
"lattices",
"multiplatform_test",
"pusherator",
+ "pyo3",
"rand 0.8.5",
"rand_distr",
"ref-cast",
@@ -1389,7 +1391,7 @@ dependencies = [
[[package]]
name = "hydroflow_cli_integration"
-version = "0.2.0"
+version = "0.3.0"
dependencies = [
"async-recursion",
"async-trait",
@@ -1405,7 +1407,7 @@ dependencies = [
[[package]]
name = "hydroflow_datalog"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"hydroflow_datalog_core",
"proc-macro-crate",
@@ -1416,7 +1418,7 @@ dependencies = [
[[package]]
name = "hydroflow_datalog_core"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"hydroflow_lang",
"insta",
@@ -1433,7 +1435,7 @@ dependencies = [
[[package]]
name = "hydroflow_lang"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"auto_impl",
"itertools",
@@ -1449,7 +1451,7 @@ dependencies = [
[[package]]
name = "hydroflow_macro"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"hydroflow_lang",
"itertools",
@@ -1505,11 +1507,12 @@ dependencies = [
[[package]]
name = "indicatif"
-version = "0.17.3"
+version = "0.17.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729"
+checksum = "0b297dc40733f23a0e52728a58fa9489a5b7638a324932de16b41adc3ef80730"
dependencies = [
"console",
+ "instant",
"number_prefix",
"portable-atomic",
"unicode-width",
@@ -1541,6 +1544,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if 1.0.0",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
]
[[package]]
@@ -1624,7 +1630,7 @@ dependencies = [
[[package]]
name = "lattices"
-version = "0.2.0"
+version = "0.4.0"
dependencies = [
"cc-traits",
"sealed",
@@ -2112,9 +2118,9 @@ dependencies = [
[[package]]
name = "portable-atomic"
-version = "0.3.19"
+version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b"
+checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b"
[[package]]
name = "ppv-lite86"
@@ -2168,9 +2174,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.56"
+version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435"
+checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb"
dependencies = [
"unicode-ident",
]
@@ -2189,7 +2195,7 @@ dependencies = [
[[package]]
name = "pusherator"
-version = "0.0.1"
+version = "0.0.3"
dependencies = [
"either",
"variadics",
@@ -2459,13 +2465,13 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.7.3"
+version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
+checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [
"aho-corasick",
"memchr",
- "regex-syntax",
+ "regex-syntax 0.7.2",
]
[[package]]
@@ -2474,7 +2480,7 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
- "regex-syntax",
+ "regex-syntax 0.6.29",
]
[[package]]
@@ -2483,6 +2489,12 @@ version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
+[[package]]
+name = "regex-syntax"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
+
[[package]]
name = "relalg"
version = "0.0.0"
@@ -3141,6 +3153,22 @@ dependencies = [
"winnow",
]
+[[package]]
+name = "topolotree"
+version = "0.0.0"
+dependencies = [
+ "dashmap",
+ "futures",
+ "hydroflow",
+ "hydroflow_datalog",
+ "procinfo",
+ "rand 0.8.5",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tokio-tungstenite",
+]
+
[[package]]
name = "tracing"
version = "0.1.37"
@@ -3242,7 +3270,7 @@ dependencies = [
"lazy_static",
"log",
"regex",
- "regex-syntax",
+ "regex-syntax 0.6.29",
"rustc-hash",
"semver 1.0.17",
"serde",
diff --git a/Cargo.toml b/Cargo.toml
index 7bc13064ba0..a5b428dd328 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ members = [
"multiplatform_test",
"pusherator",
"relalg",
+ "topolotree",
"variadics",
"website_playground",
]
diff --git a/README.md b/README.md
index 25eb46374a1..a97230b747c 100644
--- a/README.md
+++ b/README.md
@@ -7,8 +7,8 @@
-Hydroflow is a low-latency dataflow runtime written in Rust. The [Hydro Project](https://hydro.run/docs/hydroflow/ecosystem/)
-will empower developers to harness the full potential of the cloud by making distributed programs easy to specify and automatic to scale. Hydroflow serves as the lowest level in the [Hydro stack](https://hydro.run/docs/hydroflow/ecosystem/),
+Hydroflow is a low-latency dataflow runtime written in Rust. The goal of the [Hydro Project](https://hydro.run)
+is to empower developers to harness the full potential of the cloud by making distributed programs easy to specify and automatic to scale. Hydroflow is the lowest level in the [Hydro stack](https://hydro.run/docs/hydroflow/ecosystem/),
serving as a single-node low-latency runtime with explicit networking. This allows us to support
not just data processing pipelines, but distributed protocols (e.g. Paxos) and real-world
long-running applications as well.
@@ -17,16 +17,16 @@ Take a look at the [Hydroflow Book](https://hydro.run/docs/hydroflow/).
## The Hydroflow Surface Syntax
-Hydroflow comes with a custom "surface syntax" domain-specific language which serves as a very
-simple, readable IR for specifying single-node Hydroflow programs, intended to be stitched together
-by the Hydro stack to create larger autoscaling distributed systems.'
+Hydroflow comes with a custom "surface syntax"—a domain-specific language which serves as a very
+simple, readable IR for specifying single-node Hydroflow programs. These programs are intended to be stitched together
+by the Hydro stack to create larger autoscaling distributed systems.
Here's a simple example of the surface syntax. Check out the [Hydroflow Playground](https://hydro.run/playground)
for an interactive demo.
```rust
source_iter(0..10)
-> map(|n| n * n)
- -> filter(|&n| n > 10)
+ -> filter(|n| *n > 10)
-> foo;
foo = map(|n| (n..=n+1))
diff --git a/benches/benches/fork_join.rs b/benches/benches/fork_join.rs
index 1e06f0da331..58ca87868ca 100644
--- a/benches/benches/fork_join.rs
+++ b/benches/benches/fork_join.rs
@@ -54,11 +54,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
send1,
send2,
|_ctx, recv1, recv2, send1, send2| {
- for v in recv1
- .take_inner()
- .into_iter()
- .chain(recv2.take_inner().into_iter())
- {
+ for v in recv1.take_inner().into_iter().chain(recv2.take_inner()) {
if v % 2 == 0 {
send1.give(Some(v));
} else {
diff --git a/benches/benches/join.rs b/benches/benches/join.rs
index 385f60e2cdb..409820d2536 100644
--- a/benches/benches/join.rs
+++ b/benches/benches/join.rs
@@ -61,7 +61,7 @@ where
}
}
- left_tab.entry(k).or_insert_with(Vec::new).push(v);
+ left_tab.entry(k).or_default().push(v);
}
});
@@ -76,7 +76,7 @@ where
}
}
- right_tab.entry(k).or_insert_with(Vec::new).push(v);
+ right_tab.entry(k).or_default().push(v);
}
});
}
@@ -98,8 +98,8 @@ where
b.iter(|| {
let iter_a = (0..NUM_INTS).map(|x| (x, L::new(x)));
let iter_b = (0..NUM_INTS).map(|x| (x, R::new(x)));
- let mut items_a = HashMap::new();
- let mut items_b = HashMap::new();
+ let mut items_a = HashMap::<_, Vec<_>>::new();
+ let mut items_b = HashMap::<_, Vec<_>>::new();
for (key, val_a) in iter_a {
if let Some(vals_b) = items_b.get(&key) {
@@ -107,7 +107,7 @@ where
black_box((key, val_a.clone(), val_b));
}
}
- items_a.entry(key).or_insert_with(Vec::new).push(val_a);
+ items_a.entry(key).or_default().push(val_a);
}
for (key, val_b) in iter_b {
if let Some(vals_a) = items_a.get(&key) {
@@ -115,7 +115,7 @@ where
black_box((key, val_a, val_b.clone()));
}
}
- items_b.entry(key).or_insert_with(Vec::new).push(val_b);
+ items_b.entry(key).or_default().push(val_b);
}
});
},
diff --git a/benches/benches/micro_ops.rs b/benches/benches/micro_ops.rs
index bc548bbd53e..422613750b5 100644
--- a/benches/benches/micro_ops.rs
+++ b/benches/benches/micro_ops.rs
@@ -188,7 +188,7 @@ fn ops(c: &mut Criterion) {
#[allow(clippy::unnecessary_fold)]
{
hydroflow_syntax! {
- source_iter(black_box(input0)) -> fold::<'tick>(0, |accum, elem| { accum + elem }) -> for_each(|x| { black_box(x); });
+ source_iter(black_box(input0)) -> fold::<'tick>(0, |accum: &mut _, elem| { *accum += elem }) -> for_each(|x| { black_box(x); });
}
}
},
@@ -275,25 +275,25 @@ fn ops(c: &mut Criterion) {
let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
-> for_each(|x| { black_box(x); });
};
@@ -308,25 +308,25 @@ fn ops(c: &mut Criterion) {
let mut df = hydroflow_syntax! {
source_iter(black_box(DATA)) -> persist()
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
- -> next_tick()
+ -> defer_tick()
-> map(black_box)
-> for_each(|x| { black_box(x); });
};
diff --git a/benches/benches/reachability.rs b/benches/benches/reachability.rs
index 2737d74796e..7d68e6787b7 100644
--- a/benches/benches/reachability.rs
+++ b/benches/benches/reachability.rs
@@ -14,14 +14,14 @@ lazy_static::lazy_static! {
let cursor = Cursor::new(include_bytes!("reachability_edges.txt"));
let reader = BufReader::new(cursor);
- let mut edges = HashMap::new();
+ let mut edges = HashMap::<_, Vec<_>>::new();
for line in reader.lines() {
let line = line.unwrap();
let mut nums = line.split_whitespace();
let a = nums.next().unwrap().parse().unwrap();
let b = nums.next().unwrap().parse().unwrap();
assert!(nums.next().is_none());
- edges.entry(a).or_insert_with(Vec::new).push(b);
+ edges.entry(a).or_default().push(b);
}
edges
};
diff --git a/benches/benches/symmetric_hash_join.rs b/benches/benches/symmetric_hash_join.rs
index b1221b8d9e4..1834c767cc2 100644
--- a/benches/benches/symmetric_hash_join.rs
+++ b/benches/benches/symmetric_hash_join.rs
@@ -1,7 +1,7 @@
use std::hint::black_box;
use criterion::{criterion_group, criterion_main, Criterion};
-use hydroflow::compiled::pull::{SetJoinState, SymmetricHashJoin};
+use hydroflow::compiled::pull::{symmetric_hash_join_into_iter, HalfSetJoinState};
use rand::distributions::Distribution;
use rand::rngs::StdRng;
use rand::SeedableRng;
@@ -14,11 +14,14 @@ fn ops(c: &mut Criterion) {
let rhs: Vec<_> = (0..3000).map(|v| (v + 50000, ())).collect();
b.iter(|| {
- let mut state = black_box(SetJoinState::default());
- let join = SymmetricHashJoin::new(
+ let (mut lhs_state, mut rhs_state) =
+ black_box((HalfSetJoinState::default(), HalfSetJoinState::default()));
+ let join = symmetric_hash_join_into_iter(
black_box(lhs.iter().cloned()),
black_box(rhs.iter().cloned()),
- &mut state,
+ &mut lhs_state,
+ &mut rhs_state,
+ false,
);
for v in join {
@@ -32,11 +35,14 @@ fn ops(c: &mut Criterion) {
let rhs: Vec<_> = (0..3000).map(|v| (v, v + 50000)).collect();
b.iter(|| {
- let mut state = black_box(SetJoinState::default());
- let join = SymmetricHashJoin::new(
+ let (mut lhs_state, mut rhs_state) =
+ black_box((HalfSetJoinState::default(), HalfSetJoinState::default()));
+ let join = symmetric_hash_join_into_iter(
black_box(lhs.iter().cloned()),
black_box(rhs.iter().cloned()),
- &mut state,
+ &mut lhs_state,
+ &mut rhs_state,
+ false,
);
for v in join {
@@ -50,13 +56,15 @@ fn ops(c: &mut Criterion) {
let rhs: Vec<_> = (0..3000).map(|v| (v, v)).collect();
b.iter(|| {
- let mut state = black_box(SetJoinState::default());
- let join = SymmetricHashJoin::new(
+ let (mut lhs_state, mut rhs_state) =
+ black_box((HalfSetJoinState::default(), HalfSetJoinState::default()));
+ let join = symmetric_hash_join_into_iter(
black_box(lhs.iter().cloned()),
black_box(rhs.iter().cloned()),
- &mut state,
+ &mut lhs_state,
+ &mut rhs_state,
+ false,
);
-
for v in join {
black_box(v);
}
@@ -77,11 +85,14 @@ fn ops(c: &mut Criterion) {
.collect();
b.iter(|| {
- let mut state = black_box(SetJoinState::default());
- let join = SymmetricHashJoin::new(
+ let (mut lhs_state, mut rhs_state) =
+ black_box((HalfSetJoinState::default(), HalfSetJoinState::default()));
+ let join = symmetric_hash_join_into_iter(
black_box(lhs.iter().cloned()),
black_box(rhs.iter().cloned()),
- &mut state,
+ &mut lhs_state,
+ &mut rhs_state,
+ false,
);
for v in join {
@@ -105,11 +116,14 @@ fn ops(c: &mut Criterion) {
.collect();
b.iter(|| {
- let mut state = black_box(SetJoinState::default());
- let join = SymmetricHashJoin::new(
+ let (mut lhs_state, mut rhs_state) =
+ black_box((HalfSetJoinState::default(), HalfSetJoinState::default()));
+ let join = symmetric_hash_join_into_iter(
black_box(lhs.iter().cloned()),
black_box(rhs.iter().cloned()),
- &mut state,
+ &mut lhs_state,
+ &mut rhs_state,
+ false,
);
for v in join {
diff --git a/build_docs.bash b/build_docs.bash
index a004ae84698..00c5768d513 100644
--- a/build_docs.bash
+++ b/build_docs.bash
@@ -1,6 +1,8 @@
set -e
-wget -qO- https://github.com/llvm/llvm-project/releases/download/llvmorg-13.0.0/clang+llvm-13.0.0-x86_64-linux-gnu-ubuntu-16.04.tar.xz | tar xJ
+PLATFORM=${1:-"x86_64-linux-gnu-ubuntu-16.04"}
+
+wget -qO- https://github.com/llvm/llvm-project/releases/download/llvmorg-13.0.0/clang+llvm-13.0.0-$PLATFORM.tar.xz | tar xJ
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
@@ -10,10 +12,10 @@ curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
cd website_playground
-CARGO_CFG_HYDROFLOW_GENERATE_DOCS="1" RUSTFLAGS="--cfg procmacro2_semver_exempt" CC="$PWD/../clang+llvm-13.0.0-x86_64-linux-gnu-ubuntu-16.04/bin/clang" wasm-pack build
+CARGO_CFG_HYDROFLOW_GENERATE_DOCS="1" RUSTFLAGS="--cfg procmacro2_semver_exempt --cfg super_unstable" CC="$PWD/../clang+llvm-13.0.0-$PLATFORM/bin/clang" wasm-pack build
cd ../docs
npm ci
-npm run build
+LOAD_PLAYGROUND=1 npm run build
diff --git a/design_docs/2023-07_lattice_properties.md b/design_docs/2023-07_lattice_properties.md
new file mode 100644
index 00000000000..7e7b09f0f81
--- /dev/null
+++ b/design_docs/2023-07_lattice_properties.md
@@ -0,0 +1,268 @@
+# Lattice Properties
+
+## Goals
+
+1. Make monotonicity the easy and default option, make non-monotonic operations the special case.
+2. Reject operations that are incorrect (would violate monotonicity).
+ E.g. can't use a order-dependent fold on an arbitrarily ordered stream.
+3. Reason about and optimize Hydroflow graphs at proc-macro time.
+ What portions can be parallelized, partitioned, etc.
+
+## Design
+
+Introduce _stream types_, as a layer on top of lattice types. The stream type represents sequential
+information about the lattice instances, such as ordering, sorting, monotonicity, or atomization.
+
+* `SeqFlow<*, T>`
+* `LatticeFlow`
+* `DiffLatticeFlow`
+* `CumuLatticeFlow`
+
+`SeqFlow` is a special per-element representation of the `Seq<*, T>` lattice type.
+
+Stream types are **NOT** automatically infered. It will be up to the user to explicitly switch
+between different stream types.
+An alternative, using Rust's type system to infer stream types, is too fragile and more importantly
+cannot be used a proc-macro time which prevents goal #3. Having the user manually specify stream
+types ensures the scaling and monotonicity of the system will be top-of-mind, and avoids the
+complexity of implementing our own type inference system.
+
+Stream type topology. Stream types can be cast upwards:
+```mermaid
+flowchart BT
+seq["SeqFlow<*, T>"]
+lat["LatticeFlow<Lat>"]
+dif["DiffLatticeFlow<Lat>"]
+cum["CumuLatticeFlow<Lat>"]
+
+cum --> lat
+dif --> lat
+lat --> seq
+```
+
+Monotonic function topology:
+```mermaid
+flowchart BT
+any["any "function""]
+fun["deterministic function"]
+mono["monotonic function"]
+morph["morphism"]
+
+morph --> mono
+mono --> fun
+fun --> any
+```
+
+---
+
+Sending bottom $\bot$ through a [lattice flow] stream should have the exact same behavior as sending nothing
+through.
+
+
+ Note: bottom in a SeqStream is not SeqStream's bottom
+
+```rust
+Seq = VecUnion>
+Seq bottom = vec![]
+vec![bottom, bottom, bottom] is not Seq's bottom
+```
+
+
+## Operators
+
+```mermaid
+flowchart BT
+map_fn
+seq["SeqFlow<*, T>"]
+lat["LatticeFlow<Lat>"]
+dif["DiffLatticeFlow<Lat>"]
+cum["CumuLatticeFlow<Lat>"]
+
+cum --> lat
+dif --> lat
+lat --> seq
+```
+
+| Input(s) | Operator | Output(s) | Condition |
+| --- | --- | --- | --- |
+| `SeqFlow<*1, T>` | `map(f)` | `SeqFlow<*2, U>` | `f: Fn(T) -> U` |
+| `LatticeFlow` | `map(f)` | `LatticeFlow` | `f: Fn(Lat1) -> Lat2` |
+| `DiffLatticeFlow` | `map(f)` | `DiffLatticeFlow` | `f: Morphism(Lat1) -> Lat2` |
+| `CumuLatticeFlow` | `map(f)` | `CumuLatticeFlow` | `f: MonotonicFn(Lat1) -> Lat2` |
+| | | |
+| `SeqFlow<*1, T>` | `filter(p)` | `SeqFlow<*2, T>` | `p: Fn(&T) -> bool` |
+| `LatticeFlow` | `filter(p)` | `LatticeFlow` | `p: Morphism(&T) -> boolxxxx` |
+| `DiffLatticeFlow` | `filter(p)` | `DiffLatticeFlow` | `p: Morphism(&T) -> boolxxxx` |
+| `CumuLatticeFlow` | `filter(p)` | `CumuLatticeFlow` | `p: MonotonicFn(&T) -> Max` |
+| | | |
+| `SeqFlow<*1, (K, V1)>`, `SeqFlow<*2, (K, V2)>` | `join()` | `SeqFlow<*3, (K, (V1, V2))>` | |
+| | | |
+| `LatticeFlow` | `unmerge()` | `DiffLatticeFlow` | |
+| `LatticeFlow` | `merge()` | `CumuLatticeFlow` | |
+| any, $N$ times | `union()` | same out | |
+| any | `tee()` | same out, $N$ times | |
+| | | |
+| `SeqFlow<*1, T>` | `sort()` | `SeqFlow<*SORT, T>` | |
+| `LatticeFlow` | `sort()` | `SeqFlow<*SORT, Lat>` | |
+| | | |
+
+---
+
+| Input(s) | Operator | Output(s) | Condition |
+| --- | --- | --- | --- |
+| `SeqFlow<*1, T>` | `filter(p)` | `SeqFlow<*2, T>` | `p: Fn(&T) -> bool` |
+| `CumuLatticeFlow` | `filter(p)` | `CumuLatticeFlow` | `p: Fn(&T) -> bool` |
+| `[Diff]LatticeFlow` | `filter(p)` | `[Diff]LatticeFlow` | `p: Fn(&T) -> bool` |
+
+```rust
+// filter has CumuLatticeFlow input
+input
+ -> merge()
+ // Legal
+ // Good, monotonic fn
+ -> filter(|set: HashSetUnion| set.contains("hello"))
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ // Good, monotonic fn (??morphism??)
+ -> filter(|set: SingletonSetUnion| set.contains("hello"))
+ -> merge()
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ -> merge_batch()
+ // Non-deterministic
+ -> filter(|set: HashSetUnion| set.contains("hello"))
+ -> merge()
+ -> output
+```
+
+---
+
+With a `[Diff]LatticeFlow`
+
+`[Diff]LatticeFlow` should be isomorphic to a cumulative lattice flow.
+
+---
+
+`filter(P)` is equivalent to `map(|x| x.keep(P))` ?
+
+$$
+ f(a \sqcup_S b) \quad=\quad f(a) \sqcup_T f(b)
+ \quad\quad\quad\mathrm{\textit{(morphism)}}
+$$
+
+```rust
+// filter has CumuLatticeFlow input
+input
+ -> merge()
+ // Legal
+ // Good, monotonic fn
+ -> filter(|set: HashSetUnion| set.contains("hello"))
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ // Good, monotonic fn (??morphism??)
+ -> filter(|set: SingletonSetUnion| set.contains("hello"))
+ -> merge()
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ -> merge_batch()
+ // Non-deterministic
+ -> filter(|set: HashSetUnion| set.contains("hello"))
+ -> merge()
+ -> output
+```
+
+---
+
+```rust
+// filter has CumuLatticeFlow input
+input
+ -> merge()
+ // Good, monotonic
+ -> filter(|set: HashSetUnion| set.len() > 10)
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ // singleton, silly
+ -> filter(|set: SingletonSetUnion| set.len() > 10)
+ -> merge()
+ -> output
+
+// VS
+
+// filter has DiffLatticeFlow input
+input
+ -> merge_batch()
+ // bad, non deterministic
+ -> filter(|set: HashSetUnion| set.len() > 10)
+ -> merge()
+ -> output
+```
+
+---
+
+```rust
+input
+ -> merge()
+ // flat_map?
+ -> map(|hash_set| hash_set.keep(|x| x.starts_with("hello")))
+ -> output
+
+// vs
+
+input
+ -> filter(|SingletonSet(x)| x.starts_with("hello"))
+ -> merge()
+ -> output
+```
+
+---
+
+
+```rust
+// filter has LatticeFlow input
+input -> merge_batch() -> filter(P2) -> merge() -> output
+```
+
+```rust
+P1 = |SingletonSet(x)| x.starts_with("hello")
+P2 = |hash_set| hash_set.starts_with("hello")
+```
+
+```rust
+P1 = |set| set.len() < 10
+P2 = |set| set.len() < 10
+```
+
+
+
+
+```rust
+// filter has CumuLatticeFlow input
+input -> merge() -> filter(P1) -> output
+
+// VS
+
+// filter has [Diff]LatticeFlow input
+input -> filter(P2) -> merge() -> output
+```
+
+
diff --git a/design_docs/2023-08_lattice_properties.md b/design_docs/2023-08_lattice_properties.md
new file mode 100644
index 00000000000..80b4653b104
--- /dev/null
+++ b/design_docs/2023-08_lattice_properties.md
@@ -0,0 +1,334 @@
+# Lattice Properties
+
+## Goals
+
+1. Make monotonicity the easy and default option, make non-monotonic operations the special case.
+2. Reject operations that are incorrect (would violate monotonicity/determinism).
+ E.g. can't use a order-dependent fold on an arbitrarily ordered stream.
+3. Reason about and optimize Hydroflow graphs at proc-macro time.
+ What portions can be parallelized, partitioned, etc.
+
+## Design
+
+Introduce _stream types_, as a layer on top of lattice types. The stream type represents sequential
+information about the lattice instances, such as ordering, sorting, monotonicity, or atomization.
+
+* `SeqFlow<*, T>`
+* `LatticeFlow`
+* `CumuLatticeFlow`
+
+`SeqFlow` is a special per-element representation of the `Seq<*, T>` lattice type.
+
+Stream types are **NOT** automatically infered. It will be up to the user to explicitly switch
+between different stream types.
+An alternative, using Rust's type system to infer stream types, is too fragile and more importantly
+cannot be used a proc-macro time which prevents goal #3. Having the user manually specify stream
+types ensures the scaling and monotonicity of the system will be top-of-mind, and avoids the
+complexity of implementing our own type inference system.
+
+
+Items flowing through lattice flows are _lattice points_, not atoms. Not all lattices are
+atomizable, and we want to have a lattice-first perspective.
+
+
+
+Stream type topology. Stream types can be cast upwards:
+```mermaid
+flowchart BT
+seq["SeqFlow<*, T>"]
+del["DeltaLatticeFlow<Lat>"]
+cum["CumuLatticeFlow<Lat>"]
+atom["Atom lattice flow???"]
+
+del --> seq
+cum --> seq
+atom --> del
+```
+
+```mermaid
+flowchart LR
+del["DeltaLatticeFlow<Lat>"]
+cum["CumuLatticeFlow<Lat>"]
+
+del --> merge --> cum --> delta --> del
+```
+
+Monotonic function topology:
+```mermaid
+flowchart BT
+any["any "function""]
+fun["deterministic function"]
+mono["monotonic function"]
+morph["morphism"]
+
+morph --> mono
+mono --> fun
+fun --> any
+```
+
+---
+
+Sending bottom $\bot$ through a [lattice flow] stream should have the exact same behavior as sending nothing
+through.
+
+
+ Note: bottom in a SeqStream is not SeqStream's bottom
+
+```rust
+Seq = VecUnion>
+Seq bottom = vec![]
+vec![bottom, bottom, bottom] is not Seq's bottom
+```
+
+
+## Operators
+
+```rust
+// input: set {1, 2, 3, 4}
+
+// map stream
+input -> random_batches()
+ // input: { 1 }, { 2 }, { 3 }, { 4 }
+ // NOT A MORPHISM ILLEGAL
+ // the map function is a set union morphism if it acts on the atoms.
+ -> map(|x: Set| if x.all(is_even) { OptionSet(x) } else { OptionSet(None) }) -> output
+ // { 2 }, { 4 }
+
+// filter stream
+input -> atomize()
+ // input: { 1 }, { 2 }, { 3 }, { 4 }
+ -> filter(|x: Set| if x.all(is_even)) -> output
+ // { 2 }, { 4 }
+```
+
+## TODO: start with cumul thing
+
+```rust
+// input: set {1, 2, 3, 4}
+
+// map stream
+input
+ -> map(|x: Set| if x.all(is_even) { OptionSet(x) } else { OptionSet(None) }) -> output
+
+// filter stream
+input
+ -> filter(|x: (x)| 0 == x % 2) -> output
+ // { 2 }, { 4 }
+```
+
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| --- | --- | --- | --- |
+| `SeqFlow<*1, T>` | `map(f)` | `SeqFlow<*2, U>` | `f: Fn(T) -> U` |
+| `CumuLatticeFlow` | `map(f)` | `CumuLatticeFlow` | `f: MonotonicFn(Lat1) -> Lat2` |
+| `LatticeFlow` | `map(f)` | `LatticeFlow` | `f: Morphism(Lat1) -> Lat2` |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `filter(p)` | `SeqFlow<*2, T>` | `p: Fn(&T) -> bool` |
+| `CumuLatticeFlow` | `filter(p)` | `CumuLatticeFlow` | `f: MonotonicFn(&Lat) -> Max` |
+| `LatticeFlow` | `filter(p)` | Nope | no meaningful filter morphisms exist. Use `map` (convert atoms to bot) instead. |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `filter_map(f)` | `SeqFlow<*2, U>` | `f: Fn(T) -> Option` |
+| `CumuLatticeFlow` | `filter_map(f)` | `CumuLatticeFlow` | `f: MonotonicFn(&Lat1) -> WithBot` |
+| `LatticeFlow` | `filter_map(f)` | Nope | see `filter(p)` for explanation |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `LatticeFlow` | `debottom()` | `LatticeFlow` | `Lat: Debottom` |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `tee()` | $N\times$ `SeqFlow<*1, T>` | |
+| `LatticeFlow` | `tee()` | $N\times$ `LatticeFlow` | |
+| `CumuLatticeFlow` | `tee()` | $N\times$ `CumuLatticeFlow` | |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| $N\times$ `SeqFlow<***, T>` | `union()` | `SeqFlow<*out, T>` | |
+| $N\times$ `LatticeFlow` | `union()` | `LatticeFlow` | |
+| $N\times$ `CumuLatticeFlow` | `union()` | `LatticeFlow` | Note: no longer `Cumu`. Mingwei: Unnatural? |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `fold(init, f)` | `SeqFlow<*2, U>` | `init: Fn() -> U`
`fold: Fn(U, T) -> U`
+| `LatticeFlow` | `lattice_fold::()` | `CumuLatticeFlow` | `Lat2: Default` |
+| `CumuLatticeFlow` | `lattice_fold::()` | `CumuLatticeFlow` | Silly, equivalent to just `map` with convert |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>`
`SeqFlow<*2, U>` | `cross_join()` | `SeqFlow<*3, (T, U)>` | |
+|
+| `CumuLatticeFlow`
`CumuLatticeFlow` | `lattice_binary_map(f)` | `CumuLatticeFlow` | `f: BinaryMonotonicFn(Lat1, Lat2) -> Lat3` |
+| `LatticeFlow`
`LatticeFlow` | `lattice_cross_join(f)` | `LatticeFlow` | `f: BinaryMorphism(Lat1, Lat2) -> Lat3` |
+|
+| `CumuLatticeFlow`
`LatticeFlow` | `lattice_half_binary_map(f)` | `LatticeFlow` | silly?
`f: BinaryMorphismRight(Lat1, Lat2) -> Lat3` |
+| `LatticeFlow`
`CumuLatticeFlow` | `lattice_half_binary_map(f)` | `LatticeFlow` | silly?
`f: BinaryMorphismLeft(Lat1, Lat2) -> Lat3` |
+|
+| `SeqFlow<*1, (K, V1)>`
`SeqFlow<*2, (K, V2)>` | `join()` | `SeqFlow<*3, (K, (V1, V2))>` | |
+| `LatticeFlow`
`LatticeFlow` | `lattice_cross_join(keyed(f))`
`NEW_lattice_join(f)` | `LatticeFlow>` | `f: BinaryMorphism(Lat1, Lat2) -> Lat3` |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `CAST` | `SeqFlow<*1, T>` | ✅ |
+| `SeqFlow<*1, Lat>` | `CAST` | `LatticeFlow` | ❌ |
+| `SeqFlow<*1, Lat>` | `CAST` | `CumuLatticeFlow` | ❌ |
+| `LatticeFlow` | `CAST` | `SeqFlow<*1, Lat>` | ✅ |
+| `LatticeFlow` | `CAST` | `LatticeFlow` | ✅ |
+| `LatticeFlow` | `CAST` | `CumuLatticeFlow` | ❌ |
+| `CumuLatticeFlow` | `CAST` | `SeqFlow<*1, Lat>` | ✅ |
+| `CumuLatticeFlow` | `CAST` | `LatticeFlow` | ✅ |
+| `CumuLatticeFlow` | `CAST` | `CumuLatticeFlow` | ✅ |
+|
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| `SeqFlow<*1, T>` | `sort()` | `SeqFlow, T>` | |
+| `LatticeFlow` | `sort()` | `LatticeFlow` | |
+
+$
+\texttt{BinaryFn:}\quad R\times S \rightarrow T
+$
+
+$
+ \texttt{BinaryMonotonicFn:}\quad
+ a \sqsubseteq_R b,\;\; x \sqsubseteq_S y
+ \quad\Longrightarrow\quad
+ f(a, x)\ \sqsubseteq_T f(b, y)
+$
+
+
+$
+ \texttt{BinaryMorphism:}\quad\\
+ f(a\,\sqcup_R\,b,\; x) \quad=\quad f(a, x)\ \sqcup_T\ f(b, x) \\
+ f(a,\; x\,\sqcup_S\,y) \quad=\quad f(a, x)\ \sqcup_T\ f(a, y)
+$
+
+Example: join
+
+$
+ join((a \sqcup b), x) = join(a, x) \sqcup join(b, x)
+$
+
+$
+ (A \cup B) \bowtie (X \cup Y) = (A \bowtie X) \cup (A \bowtie Y) \cup (B \bowtie X) \cup (B \bowtie Y)
+$
+
+Lattice cross join:
+
+
+$ f(\bigsqcup_tA \sqcup \delta_t a,\ \bigsqcup_t X) \rightarrow (\delta_t a,\ \bigsqcup_t X) $
+
+$ f(\bigsqcup_t A,\ \bigsqcup_tX \sqcup \delta_t x) \rightarrow (\bigsqcup_t A,\ \delta_t x) $
+
+---
+
+```rust
+source_iter(...) -> map(f) -> for_each(|x| println!("{:?}", x))
+
+source_iter(...) -> deltify() -> dest_stream() ....
+-> source_stream() -> merge() -> map(f) -> for_each(|x| println!("{:?}", x))
+```
+
+---
+
+Users deal with `SeqFlow<*, T>` or `CumuLatticeFlow`
+
+`SeqFlow<*, T>`
+`LatticeFlow` -> `SeqFlow<*, Lat>`
+`DeltaLatticeFlow` is deltification of cumulative lattice flow
+
+```rust
+ --LF--> map(morph) --LF-->
+ = --LF--> Deltify() --DLF--> Merge() --LF--> map(morph) --LF-->
+ = --LF--> Deltify() --DLF--> map(morph) --DLF--> Merge() --LF-->
+
+```
+
+```rust
+--LF--> Deltify() --DLF--> map(Non_Morphism()) --SeqFlow<*, Lat2>--> merge() --LF-->
+!=
+--LF--> Deltify() --DLF--> merge() --LF<2>--> map(Non_Morphism()) --LF-->
+```
+
+```rust
+source_iter([cumu]) --LF--> map(f) --LF--> for_each(|x| println!("{:?}", x))
+
+source_iter([cumu] --LF--> deltify() --DLF<*1, L>--> dest_stream() ....
+-> source_stream() --DLF<*2, L>--> merge() --DLF<*3, L>--> map(f) -> for_each(|x| println!("{:?}", x))
+```
+
+# Postponement stack:
+* `AtomLatticeFlow`, should it exist?
+* `Debottom`, bottomless lattices
+* dependent sort orders (nested loops join)
+
+| **Input(s)** | **Operator** | **Output(s)** | **Condition** |
+| --- | --- | --- | --- |
+| `LatticeFlow` | `unmerge()` | `LatticeFlow` | |
+| `LatticeFlow` | `merge()` | `CumuLatticeFlow` | |
+| | | |
+| `SeqFlow<*1, T>` | `sort()` | `SeqFlow<*SORT, T>` | |
+| `LatticeFlow` | `sort()` | `SeqFlow<*SORT, Lat>` | |
+| | | |
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+---
+
+# junk stack
+
+```rust
+source_iter(vec![("a", 1), ("a", 2), ("b", 1), ("b", 3)])
+ -> fold_keyed(MapUnion::new(), |accum, elem| { accum.merge(MapUnion::from(elem.0, elem.1))})
+ // { "a": MaxInt<2>, "b": MaxInt<3> }
+ -> flatten()
+ // { ("a", MaxInt<2>), ("b", MaxInt<3>)}
+ -> lattice_fold(MaxInt::Bot() )//, |accum, elem| {accum.merge(elem.1)})
+ -> assert_eq(MaxInt::from(3))
+```
+
+```rust
+source_iter(vec![("a", 1), ("a", 2), ("b", 1), ("b", 2)])
+ -> fold_keyed(MapUnion::new(), |accum, elem| { accum.merge(MapUnion::from(elem.0, elem.1))})
+ // { "a": 2, "b": 2 }
+ -> flat_map(|map_union| map_union.values())
+ -> lattice_merge()
+```
+
+
+
+
+```rust
+SeqFlow -> filter(|x| x != "hello") ->
+
+LatticeFlow>
+ -> map(|SingletonSet(x)| if x.starts_with("hello") {
+ OptionSet::new(x)
+ } else {
+ OptionSet::new(None)
+ })
+ ->
+```
+
+```rust
+LatticeFlow> // Max
+ -> map(|Max(x)| if 0 == x % 2 {
+ WithBot::new(x)
+ } else {
+ WithBot::new(None)
+ }) // WithBot>
+ -> debottom()
+ // Max
+ -> map(|Max(x)| /* do something else */)
+```
+```rust
+WithBot
+OptionSet /* equivalent to */ WithBot>
+```
\ No newline at end of file
diff --git a/docs/README.md b/docs/README.md
index d37d4f2130c..1874a63678a 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -4,20 +4,45 @@ This website is built using [Docusaurus 2](https://docusaurus.io/), a modern sta
You'll need Node installed to build the website. First, install the necessary dependencies:
```bash
-$ npm install
+$ npm ci
```
-Next, you'll need to build the WebAssembly components of the website. This requires Rust and [wasm-pack](https://rustwasm.github.io/wasm-pack/):
+Finally, you can run the website locally:
+
+```bash
+$ npm run start
+```
+
+## Building the Playground
+By default, the Hydroflow / Datalog playgrounds are not loaded when launching the website. To build the playground, you'll need to follow a couple additional steps. This requires Rust and [wasm-pack](https://rustwasm.github.io/wasm-pack/):
```bash
$ rustup target add wasm32-unknown-unknown
$ cargo install wasm-pack
$ cd ../website_playground
-$ CARGO_CFG_HYDROFLOW_GENERATE_DOCS="1" wasm-pack build
+$ CARGO_CFG_HYDROFLOW_GENERATE_DOCS="1" RUSTFLAGS="--cfg procmacro2_semver_exempt --cfg super_unstable" wasm-pack build
```
-Finally, you can run the website locally:
+### Notes on building on macOS
+If you're building on macOS, you may need to install the `llvm` package with Homebrew (because the default toolchain has WASM support missing):
```bash
-$ npm run start
+$ brew install llvm
```
+
+Then, you'll need to set `TARGET_CC` and `TARGET_AR` environment variables when building the playground:
+
+```bash
+$ TARGET_CC="$(brew --prefix)/opt/llvm/bin/clang" TARGET_AR="$(brew --prefix)/opt/llvm/bin/llvm-ar" CARGO_CFG_HYDROFLOW_GENERATE_DOCS="1" RUSTFLAGS="--cfg procmacro2_semver_exempt --cfg super_unstable" wasm-pack build
+```
+
+With the WASM portion built, we can launch the website with the playground loaded:
+
+```bash
+$ LOAD_PLAYGROUND=1 npm run start
+```
+
+## Adding Papers
+1. Upload the paper PDF to the `static/papers` folder.
+2. Run the script `./extract-paper-thumbnails` (from this `docs` directory), which requires [ImageMagick to be installed](https://imagemagick.org/script/download.php).
+3. Go to `src/pages/research.js` and add the paper to the array at the top of the file.
diff --git a/docs/docs/deploy/index.md b/docs/docs/deploy/index.md
index 1d2e5870cfb..6e057864a7c 100644
--- a/docs/docs/deploy/index.md
+++ b/docs/docs/deploy/index.md
@@ -3,6 +3,13 @@ sidebar_position: 1
---
# Introduction
+
+:::caution
+
+Hydro Deploy is currently in alpha. Although it has already been used for large-scale distributed experiments within the Hydro research group, the API is rapidly changing and may come with some rough edges, and many pieces are not yet documented. Please reach out if you run into any issues!
+
+:::
+
Hydro comes equipped with a built-in deployment system, **Hydro Deploy**, which allows you to deploy your Hydro app to a variety of platforms. With Hydro Deploy, you can spin up complex services with just a few lines of Python! This guide will walk you through the process of deploying your Hydro app in a variety of scenarios.
Hydro Deploy focuses on managing the end-to-end lifecycle of networked services in the cloud. It is not a general-purpose deployment tool, and is not intended to replace systems like Docker Compose or Kubernetes. Instead, Hydro Deploy is designed to be used in conjunction with these tools to manage the lifecycle of your Hydro app.
@@ -14,8 +21,3 @@ Currently, Hydro Deploy is focused on _ephemeral applications_, which can be spu
- Initializing network connections based on a user-defined topology
- Monitoring logs from your services
-:::caution
-
-Hydro Deploy is currently in alpha. Although it has already been used for large-scale distributed experiments within the Hydro research group, the API is rapidly changing and may come with some rough edges. Please reach out if you run into any issues!
-
-:::
diff --git a/docs/docs/deploy/install.md b/docs/docs/deploy/install.md
index 68783075688..b62873c7ee8 100644
--- a/docs/docs/deploy/install.md
+++ b/docs/docs/deploy/install.md
@@ -64,5 +64,5 @@ To check that Hydro Deploy is installed correctly, you can run the following com
```console
#shell-command-next-line
hydro --version
-hydro_cli 0.0.0
+Hydro Deploy 0.1.0
```
diff --git a/docs/docs/hydroflow/architecture/index.md b/docs/docs/hydroflow/architecture/index.mdx
similarity index 81%
rename from docs/docs/hydroflow/architecture/index.md
rename to docs/docs/hydroflow/architecture/index.mdx
index dd7b93550b7..cbd4461d79f 100644
--- a/docs/docs/hydroflow/architecture/index.md
+++ b/docs/docs/hydroflow/architecture/index.mdx
@@ -1,3 +1,6 @@
+import exampleOutput from '!!raw-loader!../../../../hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap';
+import { extractMermaid } from '../../../src/util';
+
# Architecture
Hydroflow graphs are divided into two layers: the outer _scheduled layer_ and
@@ -9,39 +12,7 @@ and handoffs, while the insides of the gray boxes are compiled via the compiled
here is the graph from the
[reachability](../quickstart/example_5_reachability) chapter, which we will use a running example:
-```mermaid
-%%{init: {'theme': 'base', 'themeVariables': {'clusterBkg':'#ddd'}}}%%
-flowchart TD
-classDef pullClass fill:#02f,color:#fff,stroke:#000
-classDef pushClass fill:#ff0,stroke:#000
-linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
-subgraph "sg_1v1 stratum 0"
- 1v1[\"(1v1) source_iter(vec! [0])"/]:::pullClass
- 2v1[\"(2v1) source_stream(edges_recv)"/]:::pullClass
- 3v1[\"(3v1) union()"/]:::pullClass
- 7v1[\"(7v1) map(| v | (v, ()))"/]:::pullClass
- 4v1[\"(4v1) join()"/]:::pullClass
- 5v1[/"(5v1) flat_map(| (src, ((), dst)) | [src, dst])"\]:::pushClass
- 6v1[/"(6v1) tee()"\]:::pushClass
- 10v1["(10v1) handoff"]:::otherClass
- 10v1--1--->3v1
- 1v1--0--->3v1
- 2v1--1--->4v1
- 3v1--->7v1
- 7v1--0--->4v1
- 4v1--->5v1
- 5v1--->6v1
- 6v1--0--->10v1
-end
-subgraph "sg_2v1 stratum 1"
- 8v1[/"(8v1) unique()"\]:::pushClass
- 9v1[/"(9v1) for_each(| x | println! ("Reached: {}", x))"\]:::pushClass
- 8v1--->9v1
-end
-6v1--1--->11v1
-11v1["(11v1) handoff"]:::otherClass
-11v1===o8v1
-```
+
The [Hydroflow Architecture Design Doc](https://hydro-project.github.io/hydroflow/design_docs/2021-10_architecture_design_doc.html)
contains a more detailed explanation of this section. Note that some aspects of
@@ -107,7 +78,7 @@ convert a graph into in-out trees.
Hydroflow's _Surface Syntax_ hides
the distinction between these two layers. It offers a natural `Iterator`-like chaining syntax for building
-graphs that get parsed and compiled into a scheduled graph of one or more compiled subgraphs. Please see the [Surface Syntax](../syntax/index) docs for more information.
+graphs that get parsed and compiled into a scheduled graph of one or more compiled subgraphs. Please see the [Surface Syntax](../syntax/) docs for more information.
Alternatively, the _Core API_ allows you to interact with handoffs directly at a low
level. It doesn't provide any notion of chainable operators. You can use Rust `Iterator`s
diff --git a/docs/docs/hydroflow/concepts/cyclic_flows.md b/docs/docs/hydroflow/concepts/cyclic_flows.md
new file mode 100644
index 00000000000..e8ac836d52a
--- /dev/null
+++ b/docs/docs/hydroflow/concepts/cyclic_flows.md
@@ -0,0 +1,49 @@
+---
+sidebar_position: 2
+---
+
+import CodeBlock from '@theme/CodeBlock';
+import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_5_reachability.rs';
+import exampleCode2 from '!!raw-loader!../../../../hydroflow/examples/example_naturals.rs';
+import { getLines, extractOutput, extractMermaid } from '../../../src/util';
+
+# Dataflow Cycles and Fixpoints
+Many dataflow libraries only support acyclic flow graphs (DAGs); Hydroflow goes further and supports cycles. Hydroflow's semantics for cyclic flows are based on the formal foundations of recursive queries in the [Datalog](https://en.wikipedia.org/wiki/Datalog) language, which also influenced the design of recursive query features in SQL.
+
+The basic pattern for cycles in Hydroflow looks something like this:
+```
+ base = source_() -> ... -> [base]cycle;
+ cycle = union()
+ -> ...
+ -> [next]cycle;
+```
+That is, we can trace a cycle of operators in the graph, where one operator is a `union()` that accepts two inputs, one of which is the "back edge" that closes the cycle.
+
+For a concrete example, we can revisit the code in the [Graph Reachability](../quickstart/example_5_reachability.mdx) quickstart program:
+
+{getLines(exampleCode, 7, 22)}
+
+The cycle in that program matches our rough pattern as follows:
+```
+ origin = source_iter(vec![0]) -> [base]reached_vertices;
+ reached_vertices = union() -> map(...)
+ -> [0]my_join_tee
+ -> ...
+ -> [next]reached_vertices;
+```
+
+How should we think about a cycle like this? Intuitively, we can think of the cycle beginning to compute on the data from `base` that comes in via `[0]cycle`. In the Graph Reachability example, this base data corresponds to the origin vertex, `0`. By joining [0] with the `stream_of_edges`, we generate neighbors (1 hop away) and pass them back into the cycle. When one of these is joined again with `stream_of_edges` we get a neighbor of a neighbor (2 hops away). When one of *these* is joined with `stream_of_edges` we get a vertex 3 hops away, and so on.
+
+If you prefer to think about this program as logic, it represents [Mathematical Induction](https://en.wikipedia.org/wiki/Mathematical_induction) via dataflow: the data from `base` going into `[0]cycle` (i.e. the origin vertex, 0 hops away) is like a "base case", and the data going into `[1]cycle` represents the "induction step" (a vertex *k+1* hops away). (A graph with multiple cycles represents multiple induction, which is a relatively uncommon design pattern in both mathematics and Hydroflow!)
+
+When does this process end? As with most Hydroflow questions, the answer is not in terms of control flow, but rather in terms of dataflow: *the cycle terminates when it produces no new data*, a notion called a [fixpoint](https://en.wikipedia.org/wiki/Fixed_point_(mathematics)). Our graph reachability example, it terminates when there are no new vertices to visit. Note that the `[join()](../syntax/surface_ops_gen#join)` operator is defined over the *sets* of inputs on each side, and sets
+by definition do not contain duplicate values. This prevents the Reachability dataflow from regenerating the same value multiple times.
+
+Like many looping constructs, it is possible to write a cyclic Hydroflow program that never ``terminates``, in the sense that it produces an unbounded stream of data. If we use `[join_multiset()](../syntax/surface_ops_gen#join_multiset)` instead of `[join()](../syntax/surface_ops_gen#join)` in our Reachability dataflow, the call to `flow.run_available()` never terminates, because each time the same vertex is visited, new data is generated!
+
+A simpler example of a non-terminating cycle is the following, which specifies the natural numbers:
+
+{exampleCode2}
+
+Like any sufficiently powerful language, Hydroflow cannot guarantee that your programs terminate. If you're debugging a non-terminating Hydroflow program, it's a good idea to identify the dataflow cycles and insert an
+`[inspect()](../syntax/surface_ops_gen#inspect)` operator along an edge of the cycle to see if it's generating unbounded amounts of duplicated data. You can use the `[unique()](../syntax/surface_ops_gen#unique)` operator to ameliorate the problem.
\ No newline at end of file
diff --git a/docs/docs/hydroflow/concepts/debugging.md b/docs/docs/hydroflow/concepts/debugging.md
index 083d1ec6ba0..1b1fc22b6d6 100644
--- a/docs/docs/hydroflow/concepts/debugging.md
+++ b/docs/docs/hydroflow/concepts/debugging.md
@@ -1,5 +1,5 @@
---
-sidebar_position: 4
+sidebar_position: 5
---
# Debugging
diff --git a/docs/docs/hydroflow/concepts/distributed_time.md b/docs/docs/hydroflow/concepts/distributed_time.md
index e410612535a..82f5e79c407 100644
--- a/docs/docs/hydroflow/concepts/distributed_time.md
+++ b/docs/docs/hydroflow/concepts/distributed_time.md
@@ -1,5 +1,5 @@
---
-sidebar_position: 3
+sidebar_position: 4
---
# Distributed Time
diff --git a/docs/docs/hydroflow/concepts/error_handling.md b/docs/docs/hydroflow/concepts/error_handling.md
index 7c28c29142e..3ec80599129 100644
--- a/docs/docs/hydroflow/concepts/error_handling.md
+++ b/docs/docs/hydroflow/concepts/error_handling.md
@@ -1,5 +1,5 @@
---
-sidebar_position: 5
+sidebar_position: 6
---
# Error Handling
diff --git a/docs/docs/hydroflow/concepts/stratification.md b/docs/docs/hydroflow/concepts/stratification.md
index 4d1042c810f..b06018b3bbb 100644
--- a/docs/docs/hydroflow/concepts/stratification.md
+++ b/docs/docs/hydroflow/concepts/stratification.md
@@ -1,5 +1,5 @@
---
-sidebar_position: 2
+sidebar_position: 3
---
# Streaming, Blocking and Stratification
diff --git a/docs/docs/hydroflow/index.mdx b/docs/docs/hydroflow/index.mdx
index f66b9e51909..1cf7a673b1d 100644
--- a/docs/docs/hydroflow/index.mdx
+++ b/docs/docs/hydroflow/index.mdx
@@ -11,7 +11,6 @@ import HydroflowDocs from '../../../hydroflow/README.md'
## This Book
This book will teach you how to set up your environment to get started with Hydroflow, and how to program in the Hydroflow surface syntax.
-Keep in mind that Hydroflow is under active development and is constantly
-changing. However the code in this book is tested with the Hydroflow library so should always be up-to-date.
+Keep in mind that Hydroflow is under active development. However the code in this book is tested with the Hydroflow library so should always be up-to-date.
If you have any questions, feel free to [create an issue on Github](https://github.com/hydro-project/hydroflow/issues/new).
diff --git a/docs/docs/hydroflow/quickstart/example_1_simplest.mdx b/docs/docs/hydroflow/quickstart/example_1_simplest.mdx
index e378d92e4e5..980ad8f6f62 100644
--- a/docs/docs/hydroflow/quickstart/example_1_simplest.mdx
+++ b/docs/docs/hydroflow/quickstart/example_1_simplest.mdx
@@ -62,12 +62,17 @@ item passed in.
The Hydroflow surface syntax is merely a *specification*; it does not actually do anything
until we run it.
-We run the flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available).
+We can run this flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available).
+
{getLines(exampleCode, 8)}
+
Note that `run_available()` runs the Hydroflow graph until no more work is immediately
available. In this example flow, running the graph drains the iterator completely, so no
more work will *ever* be available. In future examples we will use external inputs such as
-network ingress, in which case more work might appear at any time.
+network ingress, in which case more work might appear at any time. In those examples we may need a different method than `run_available()`,
+e.g. the [`run_async()`](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_async) method,
+which we'll see
+in [the EchoServer example](./example_7_echo_server).
### A Note on Project Structure
The template project is intended to be a starting point for your own Hydroflow project, and you can add files and directories as you see fit. The only requirement is that the `src/main.rs` file exists and contains a `main()` function.
diff --git a/docs/docs/hydroflow/quickstart/example_2_simple.mdx b/docs/docs/hydroflow/quickstart/example_2_simple.mdx
index 9be2e31ae76..5e8cf286b71 100644
--- a/docs/docs/hydroflow/quickstart/example_2_simple.mdx
+++ b/docs/docs/hydroflow/quickstart/example_2_simple.mdx
@@ -61,6 +61,12 @@ Replace the contents of `src/main.rs` with the following:
{exampleCode2}
+Here the `filter_map` operator takes a map closure that returns a Rust [`Option`](https://doc.rust-lang.org/std/option/enum.Option.html).
+If the value is `Some(...)`, it is passed to the output; if it is `None` it is filtered.
+
+The `flat_map` operator takes a map closure that generates a collection type (in this case a `Vec`)
+which is flattened.
+
Results:
{extractOutput(exampleOutput2)}
diff --git a/docs/docs/hydroflow/quickstart/example_3_stream.mdx b/docs/docs/hydroflow/quickstart/example_3_stream.mdx
index 9a806d9bd97..8ec6856c876 100644
--- a/docs/docs/hydroflow/quickstart/example_3_stream.mdx
+++ b/docs/docs/hydroflow/quickstart/example_3_stream.mdx
@@ -12,7 +12,7 @@ import { getLines, extractOutput } from '../../../src/util';
> - the [`source_stream`](../syntax/surface_ops_gen.md#source_stream) operator that brings channel input into Hydroflow
> - Rust syntax to programmatically send data to a (local) channel
-In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same machine and sent into the channel programmatically via Rust.
+In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same thread and sent into the channel programmatically via Rust.
For discussion, we start with a skeleton much like before:
diff --git a/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx b/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx
index 29c4fb6e98b..2f755c35131 100644
--- a/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx
+++ b/docs/docs/hydroflow/quickstart/example_4_neighbors.mdx
@@ -14,7 +14,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * The [`unique`](../syntax/surface_ops_gen.md#unique) operator for removing duplicates from a stream
> * Visualizing hydroflow code via `flow.meta_graph().expect(...).to_mermaid()`
-So far all the operators we've used have one input and one output and therefore
+So far, all the operators we've used have one input and one output and therefore
create a linear flow of operators. Let's now take a look at a Hydroflow program containing
an operator which has multiple inputs; in the following examples we'll extend this to
multiple outputs.
@@ -67,7 +67,7 @@ Run the program and focus on the last three lines of output, which come from `fl
That looks right: the edges we "sent" into the flow that start at `0` are
`(0, 1)` and `(0, 3)`, so the nodes reachable from `0` in 0 or 1 hops are `0, 1, 3`.
-> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here is producing a `set` of nodes, so the order in which they are printed out is not specified. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can be used to sort the output of a flow.
+> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here uses the [`join()`](../syntax/surface_ops_gen.md#join) operator, which deals in `sets` of data items, so the order in which a `join()`'s output items are generated is not specified or guaranteed. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can always be used to sort the output of a flow if needed.
## Examining the Hydroflow Code
In the code, we want to start out with the origin vertex, `0`,
@@ -123,7 +123,7 @@ to see the graph, which should look as follows:
You may be wondering why the nodes in the graph have different colors (and shapes, for readers who cannot distinguish
colors easily). The answer has nothing to do with the meaning of the program, only with the way that Hydroflow compiles
operators into Rust. Simply put, blue (wide-topped) boxes _pull_ data, yellow (wide-bottomed) boxes _push_ data, and the `handoff` is a special operator that buffers pushed data for subsequent pulling. Hydroflow always places a handoff
-between a push producer and a pull consumer, for reasons explained in the [Architecture](../architecture/index.md) chapter.
+between a push producer and a pull consumer, for reasons explained in the [Architecture](../architecture/index.mdx) chapter.
Returning to the code, if you read the `edges_send` calls carefully, you'll see that the example data
has vertices (`2`, `4`) that are more than one hop away from `0`, which were
diff --git a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
index e9834ff8c8e..364ff45c4f1 100644
--- a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
+++ b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
@@ -11,8 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * Implementing a recursive algorithm (graph reachability) via cyclic dataflow
> * Operators to union data from multiple inputs ([`union`](../syntax/surface_ops_gen.md#union)), and send data to multiple outputs ([`tee`](../syntax/surface_ops_gen.md#tee))
> * Indexing multi-output operators by appending a bracket expression
-> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.
-
+> * A first example of a cyclic (recursive) flow and the concept of fixpoint.
To expand from graph neighbors to graph reachability, we want to find vertices that are connected not just to `origin`,
but also to vertices reachable *transitively* from `origin`. Said differently, a vertex is reachable from `origin` if it is
@@ -67,19 +66,16 @@ We route the `origin` vertex into it as one input right away:
{getLines(exampleCode, 8, 12)}
-Note the square-bracket syntax for differentiating the multiple inputs to `union()`
-is the same as that of `join()` (except that union can have an unbounded number of inputs,
-whereas `join()` is defined to only have two.)
+Note the square-bracket syntax for assigning index names to the multiple inputs to `union()`; this is similar
+to the indexes for `join()`, except that (a) union can have an arbitrary number of inputs, (b) the index names can be arbitrary strings, and (c) the indexes are optional can be omitted entirely. (By contrast, recall that
+`join()` is defined to take 2 required input indexes, `[0]` and `[1]`). The only reason to assign index names to the inputs of `union()` is for labeling edges in the generated (e.g. Mermaid) graphs.
-Now, `join()` is defined to only have one output. In our program, we want to copy the joined
-output to two places: to the original `for_each` from above to print output, and *also*
-back to the `union` operator we called `reached_vertices`. We feed the `join()` output
+The next group of statements lays out the join of `reached_vertices` and the `stream_of_edges`. The `join()` operator is defined to only have one output, but in our program, we need its output twice: once to feed the original `for_each` from above to print output, and also to feed
+back to the `union` operator that we called `reached_vertices`. We pass the `join()` output
through a `flat_map()` as before, and then we feed the result into a [`tee()`](../syntax/surface_ops_gen.md#tee) operator,
which is the mirror image of `union()`: instead of merging many inputs to one output,
-it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and
-given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of union: we *append*
-an output index in square brackets to the `tee` or variable. In this example we have
-`my_join_tee[0] ->` and `my_join_tee[1] ->`.
+it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and separate copy is given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of the inputs to union: we can (optionally) *append*
+an arbitrary output index name in square brackets to the `tee` or variable. In this example we have `my_join_tee[cycle] ->` and `my_join_tee[print] ->`.
Finally, we process the output of the `join` as passed through the `tee`.
One branch pushes reached vertices back up into the `reached_vertices` variable (which begins with a `union`), while the other
@@ -87,9 +83,6 @@ prints out all the reached vertices as in the simple program.
{getLines(exampleCode, 14, 17)}
-Note the syntax for differentiating the *outputs* of a `tee()` is symmetric to that of `union()`,
-showing up to the right of the variable rather than the left.
-
Below is the diagram rendered by [mermaid](https://mermaid-js.github.io/) showing
the structure of the full flow:
@@ -97,4 +90,7 @@ the structure of the full flow:
This is similar to the flow for graph neighbors, but has a few more operators that make it look
more complex. In particular, it includes the `union` and `tee` operators, and a cycle-forming back-edge.
-There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff`.
+There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff` (see the [Architecture section](../architecture/handoffs)).
+
+# Cyclic Dataflow
+Many dataflow and workflow systems are restricted to acyclic graphs (DAGs), but Hydroflow supports cycles, as we see in this example.
\ No newline at end of file
diff --git a/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx b/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx
index 941543551c9..18a1cdbcd5d 100644
--- a/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx
+++ b/docs/docs/hydroflow/quickstart/example_6_unreachability.mdx
@@ -11,6 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * Extending a program with additional downstream logic.
> * Hydroflow's ([`difference`](../syntax/surface_ops_gen.md#difference)) operator
> * A first exposure to the concepts of _strata_ and _ticks_
+> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.
Our next example builds on the previous by finding vertices that are _not_ reachable. To do this, we need to capture the set `all_vertices`, and use a [difference](../syntax/surface_ops_gen.md#difference) operator to form the difference between that set of vertices and `reachable_vertices`.
@@ -74,16 +75,17 @@ in order, one at a time, ensuring all values are computed
before moving on to the next stratum. Between strata we see a _handoff_, which logically buffers the
output of the first stratum, and delineates the separation of execution between the 2 strata.
+If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with
+correctness, but rather the way that Hydroflow graphs are compiled and scheduled (as
+discussed in the section on [In-Out Trees](../architecture/in-out_trees). We need not concern ourselves with this detail other than to look carefully at the `stratum` labels on the grey boxes in our Mermaid diagrams.
+
All the subgraphs labeled `stratum 0` are run first to completion,
and then all the subgraphs labeled `stratum 1` are run. This captures the requirements of the `difference` operator: it has to wait for its full negative input before it can start producing output. Note
how the `difference` operator has two inputs (labeled `pos` and `neg`), and only the `neg` input shows up as blocking (with the bold edge ending in a ball).
-Meanwhile, note stratum 0 has a recursive loop, and stratum 1 that computes `difference`, with the blocking input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are found, before moving on to compute the unreached vertices.
+In this Mermaid graph, note that stratum 0 has a recursive loop back through `my_join`, and `tee`s off output to the `difference` operator in stratum 1 via the handoff and the blocking `neg` input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are passed to the handoff (a [fixpoint](../concepts/cyclic_flows)), before moving on to compute the unreached vertices via stratum 1.
-After all strata are run, Hydroflow returns to the stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md).
+After all strata are run, Hydroflow returns to stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md).
-If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with
-correctness, but rather the way that Hydroflow graphs are compiled and scheduled, as
-discussed in the chapter on [Architecture](../architecture/index.md).
diff --git a/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx b/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx
index 4f100b22ef2..48ef12b3e31 100644
--- a/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx
+++ b/docs/docs/hydroflow/quickstart/example_7_echo_server.mdx
@@ -31,23 +31,9 @@ cargo generate hydro-project/hydroflow-template
```
Then change directory into the resulting project.
+The Hydroflow template project provides *this example* as its default, so there's no code for us to change.
The `README.md` for the template project is a good place to start. It contains a brief overview of the project structure, and how to build and run the example. Here we'll spend more time learning from the code.
-## Hydroflow project structure
-The Hydroflow template project auto-generates this example for us. If you prefer, you can find the source in the `examples/echo_server` directory of the Hydroflow repository.
-
-The directory structure encouraged by the template is as follows:
-```txt
-project/README.md # documentation
-project/Cargo.toml # package and dependency info
-project/src/main.rs # main function
-project/src/protocol.rs # message types exchanged between roles
-project/src/helpers.rs # helper functions used by all roles
-project/src/.rs # service definition for role A (e.g. server)
-project/src/.rs # service definition for role B (e.g. client)
-```
-In the default example, the roles we use are `Client` and `Server`, but you can imagine different roles depending on the structure of your service or application.
-
### `main.rs`
We start with a `main` function that parses command-line options, and invokes the appropriate
role-specific service.
@@ -61,7 +47,7 @@ Following that, we use Rust's [`clap`](https://docs.rs/clap/latest/clap/) (Comma
This sets up 3 command-line flags: `role`, `addr`, and `server_addr`. Note how the `addr` and `server_addr` flags are made optional via wrapping in a Rust `Option`; by contrast, the `role` option is required. The `clap` crate will parse the command-line options and populate the `Opts` struct with the values. `clap` handles parsing the command line strings into the associated Rust types -- the `value_parser` attribute tells `clap` to use Hydroflow's `ipv4_resolve` helper function to parse a string like "127.0.0.1:6552" into a `SocketAddr`.
-This brings us to the `main` function itself. It is prefaced by a `#[hydroflow::main]` attribute, which is a macro that sets up the tokio runtime for Hydroflow. This is necessary because Hydroflow uses the tokio runtime for asynchronous execution as a service.
+This brings us to the `main` function itself. It is prefaced by a `#[hydroflow::main]` attribute, which is a macro that sets up the tokio runtime for Hydroflow. It is also an async function. This is necessary because Hydroflow uses the tokio runtime for asynchronous execution as a service.
{getLines(main, 29, 40)}
diff --git a/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx b/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx
index 06d3c8c6c52..90dec2b370b 100644
--- a/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx
+++ b/docs/docs/hydroflow/quickstart/example_8_chat_server.mdx
@@ -13,7 +13,7 @@ import { getLines } from '../../../src/util';
> * Multiple message types and the [`demux`](../syntax/surface_ops_gen.md#demux) operator.
> * A broadcast pattern via the [`cross_join`](../syntax/surface_ops_gen.md#cross_join) operator.
> * One-time bootstrapping pipelines
-> * A "gated buffer" pattern via `cross_join` with a single-object input.
+> * A "gated buffer" using [`defer_signal`](../syntax/surface_ops_gen.md#defer_signal) and [`persist`](../syntax/surface_ops_gen.md#persist) operators
Our previous [echo server](./example_7_echo_server) example was admittedly simplistic. In this example, we'll build something a bit more useful: a simple chat server. We will again have two roles: a `Client` and a `Server`. `Clients` will register their presence with the `Server`, which maintains a list of clients. Each `Client` sends messages to the `Server`, which will then broadcast those messages to all other clients.
@@ -53,18 +53,21 @@ To follow along, replace the contents of `src/server.rs` with the code below:
{getLines(server, 1, 24)}
-After a short prelude, we have the Hydroflow code near the top of `run_server()`. It begins by defining `outbound_chan` as a `union`d destination sink for network messages. Then we get to the
+After a short prelude, we have the Hydroflow code near the top of `run_server()`. It begins by defining `outbound_chan` as a `union`ed destination sink for network messages. Then we get to the
more interesting `inbound_chan` definition.
The `inbound` channel is a source stream that will carry many
-types of `Message`s. We use the [`demux`](../syntax/surface_ops_gen.md#demux) operator to partition the stream objects into three channels. The `clients` channel
+types of `Message`s.
+We first use a `map` operator to `unwrap` the Rust `Result` type that comes from deserializing the input
+from `source_stream_serde`.
+Then we use the [`demux`](../syntax/surface_ops_gen.md#demux) operator to partition the stream objects into three channels. The `clients` channel
will carry the addresses of clients that have connected to the server. The `msgs` channel will carry the `ChatMsg` messages that clients send to the server.
The `errs` channel will carry any other messages that clients send to the server.
Note the structure of the `demux` operator: it takes a closure on
-`(Message, SocketAddr)` pairs, and a variadic tuple (`var_args!`) of output channel names—in this case `clients`, `msgs`, and `errs`. The closure is basically a big
+`(Message, SocketAddr)` pairs, and a variadic tuple (`var_args!`) of the output channel names—in this case `clients`, `msgs`, and `errs`. The closure is basically a big
Rust pattern [`match`](https://doc.rust-lang.org/book/ch06-02-match.html), with one arm for each output channel name given in the variadic tuple. Note
-that the different output channels can have different-typed messages! Note also that we destructure the incoming `Message` types into tuples of fields. (If we didn't we'd either have to write boilerplate code for each message type in every downstream pipeline, or face Rust's dreaded [refutable pattern](https://doc.rust-lang.org/book/ch18-02-refutability.html) error!)
+that each output channel can have its own message type! Note also that we destructure the incoming `Message` types into component fields. (If we didn't we'd have to write boilerplate code to handle every possible `Message` type in every downstream pipeline!)
The remainder of the server consists of two independent pipelines, the code to print out the flow graph,
and the code to run the flow graph. To follow along, paste the following into the bottom of your `src/server.rs` file:
@@ -139,9 +142,8 @@ end
### `client.rs`
The chat client is not very different from the echo server client, with two new design patterns:
- 1. a degenerate `source_iter` pipeline that runs once
-as a "bootstrap" in the first tick
- 2. the use of `cross_join` as a "gated buffer" to postpone sending messages.
+ 1. a `initialize` operator that runs once to "bootstrap" action in the first tick
+ 2. the use of `defer_signal` and `persist` as a "gated buffer" to postpone sending messages.
We also include a Rust helper routine `pretty_print_msg` for formatting output.
@@ -152,7 +154,7 @@ To follow along, start by replacing the contents of `src/client.rs` with the fol
{getLines(client, 1, 27)}
This brings us to the `run_client` function. As in `run_server` we begin by ensuring the server address
-is supplied. We then have the hydroflow code starting with a standard pattern of a `union`d `outbound_chan`,
+is supplied. We then have the hydroflow code starting with a standard pattern of a `union`ed `outbound_chan`,
and a `demux`ed `inbound_chan`. The client handles only two inbound `Message` types: `Message::ConnectResponse` and `Message::ChatMsg`.
Paste the following to the bottom of `src/client.rs`:
@@ -165,15 +167,15 @@ bottom of your `src/client.rs` file.
{getLines(client, 47, 64)}
1. The first pipeline is the "bootstrap" alluded to above.
-It starts with `source_iter` operator that emits a single, opaque "unit" (`()`) value. This value is available when the client begins, which means
+It starts with the `initialize` operator that emits a single, opaque "unit" (`()`) value. This value is emitted when the client begins, which means
this pipeline runs once, immediately on startup, and generates a single `ConnectRequest` message which is sent to the server.
-2. The second pipeline reads from `source_stdin` and sends messages to the server. It differs from our echo-server example in the use of a `cross_join`
-with `inbound_chan[acks]`. This cross-join is similar to that of the server: it forms pairs between all messages and all servers that send a `ConnectResponse` ack.
-In principle this means that the client is broadcasting each message to all servers.
-In practice, however, the client establishes at most one connection to a server. Hence over time, this pipeline starts with zero `ConnectResponse`s and is sending no messages;
-subsequently it receives a single `ConnectResponse` and starts sending messages. The `cross_join` is thus effectively a buffer for messages, and a "gate" on that buffer that opens
-when the client receives its sole `ConnectResponse`.
+2. The second pipeline reads from `source_stdin` and sends messages to the server. It differs from our echo-server example in the use of the [`defer_signal`](../syntax/surface_ops_gen.md#defer_signal) operator, which buffers up messages until a `ConnectResponse` is received. The flow assigned to the `lines`
+variable takes chat messages from stdin and passes them to the `[input]` channel of the `defer_signal`.
+The `defer_signal` operator buffers these messages until it gets an input on its `[signal]` channel. Then all `[input]` data buffered from previous ticks is passed along to the output, along with any data that streams in during the current tick.
+In our chat example, we want messages to be sent to the server in *all subsequent ticks* after `ConnectResponse` is received! To enforce this, we need to send something on the `[signal]` channel of `defer_signal` every subsequent tick. We achieve this by interposing a `persist` between `inbound_chan[acks]` and `[signal]msg_send`. The [`persist`](../syntax/surface_ops_gen.md#persist) operator stores its input data in order across time, and replays its current contents
+each tick. In this case it is storing `ConnectResponse` messages, of which we expect only one. The
+`persist` op will replay this signal every tick after it is received, so the client will always send its messages to the server once connected.
3. The final pipeline simply pretty-prints the messages received from the server.
@@ -275,7 +277,7 @@ May 31, 5:12:40 alice: Is there anyone home?
Now start client "bob" in terminal 3, and notice how he instantly receives the backlog of Alice's messages from the server's `cross_join`.
(The messages may not be printed in the same order as they were timestamped! The `cross_join` operator is not guaranteed to preserve order, nor
-is the udp network. Fixing these issues requires extra client logic that we leave as an exercise to the reader.)
+is the udp network. Fixing these issues requires extra client logic (perhaps using the [`sort()`](../syntax/surface_ops_gen#sort) operator) that we leave as an exercise to the reader.)
```console
#shell-command-next-line
cargo run -- --name "bob" --role client --server-addr 127.0.0.1:12347
diff --git a/docs/docs/hydroflow/quickstart/setup.md b/docs/docs/hydroflow/quickstart/setup.md
index 08b76be74e0..f3a2c477688 100644
--- a/docs/docs/hydroflow/quickstart/setup.md
+++ b/docs/docs/hydroflow/quickstart/setup.md
@@ -47,9 +47,7 @@ cargo install cargo-generate
## VS Code Setup
We recommend using VS Code with the `rust-analyzer` extension (and NOT the
-`Rust` extension). To enable the pre-release version of `rust-analyzer`
-(required by some new nightly syntax we use, at least until 2022-03-14), click
-the "Switch to Pre-Release Version" button next to the uninstall button.
+`Rust` extension).
## Setting up a Hydroflow Project
The easiest way to get started with Hydroflow is to begin with a template project.
@@ -122,9 +120,10 @@ will provide inline type and error messages, code completion, etc.
To work with the repository, it's best to start with an "example", found in the
[`hydroflow/examples` folder](https://github.com/hydro-project/hydroflow/tree/main/hydroflow/examples).
-These examples are included via the [`hydroflow/Cargo.toml` file](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/Cargo.toml),
-so make sure to add your example there if you create a new one. The simplest
-example is the [`echo server`](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/echoserver/main.rs).
+The simplest example is the
+['hello world'](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/hello_world/main.rs) example;
+the simplest example with networking is the
+[`echo server`](https://github.com/hydro-project/hydroflow/blob/main/hydroflow/examples/echoserver/main.rs).
The Hydroflow repository is set up as a [workspace](https://doc.rust-lang.org/book/ch14-03-cargo-workspaces.html),
i.e. a repo containing a bunch of separate packages, `hydroflow` is just the
diff --git a/docs/docs/hydroflow/syntax/index.mdx b/docs/docs/hydroflow/syntax/index.mdx
index bbaf37cc0e8..458a2800ff3 100644
--- a/docs/docs/hydroflow/syntax/index.mdx
+++ b/docs/docs/hydroflow/syntax/index.mdx
@@ -4,7 +4,7 @@ import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_syn
# Hydroflow Surface Syntax
The natural way to write a Hydroflow program is using the _Surface Syntax_ documented here.
It is a chained `Iterator`-style syntax of operators built into Hydroflow that should be sufficient
-for most uses. If you want lower-level access you can work with the `Core API` documented in the [Architecture](../architecture/index) section.
+for most uses. If you want lower-level access you can work with the `Core API` documented in the [Architecture](../architecture/) section.
In this chapter we go over the syntax piece by piece: how to [embed surface syntax in Rust](./surface_embedding) and how to specify [_flows_](./surface_flows), which consist of [_data sources_](./surface_data) flowing through [_operators_](./surface_ops_gen).
diff --git a/docs/docs/hydroflow/syntax/surface_data.mdx b/docs/docs/hydroflow/syntax/surface_data.mdx
index 6e6d3a18e42..66a27b4c99d 100644
--- a/docs/docs/hydroflow/syntax/surface_data.mdx
+++ b/docs/docs/hydroflow/syntax/surface_data.mdx
@@ -6,20 +6,27 @@ import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_syn
# Data Sources and Sinks in Rust
Any useful flow requires us to define sources of data, either generated computationally or received from
-and outside environment via I/O.
+an outside environment via I/O.
-## `source_iter`
-A flow can receive data from a Rust collection object via the `source_iter` operator, which takes the
+## One-time Iterator Sources
+A flow can receive data from a Rust collection object via the [`source_iter()`](./surface_ops_gen.md#source_iter) operator, which takes the
iterable collection as an argument and passes the items down the flow.
For example, here we iterate through a vector of `usize` items and push them down the flow:
```rust,ignore
source_iter(vec![0, 1]) -> ...
```
-The Hello, World example above uses this construct.
+The Hello, World example above uses this construct.
-## `source_stream`
+The [`source_file()`](./surface_ops_gen.md#source_file) and [`source_json()`](./surface_ops_gen.md#source_json) operators are similar, but read from a specified file.
+
+All of these operators output the contents of their collection only once, during the first tick.
+To output every tick, consider feeding results into a [`persist()`](./surface_ops_gen.md#persist)
+operator.
+
+## Streaming Sources
More commonly, a flow should handle external data coming in asynchronously from a [_Tokio_ runtime](https://tokio.rs/tokio/tutorial).
-One way to do this is with _channels_ that allow Rust code to send data into the Hydroflow inputs.
+
+One way to do this is with _channels_ that allow Rust code to send data into Hydroflow via the [`source_stream()`](./surface_ops_gen.md#source_stream) operator.
The code below creates a channel for data of (Rust) type `(usize, usize)`:
```rust,ignore
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
@@ -30,7 +37,7 @@ it explicitly as follows:
```rust,ignore
input_send.send((0, 1)).unwrap()
```
-And in our Hydroflow syntax we can receive the data from the channel using the `source_stream` syntax and
+And in our Hydroflow syntax we can receive the data from the channel using the [`source_stream()`](./surface_ops_gen.md#source_stream) operator and
pass it along a flow:
```rust,ignore
source_stream(input_recv) -> ...
@@ -41,4 +48,19 @@ in from outside the flow:
{exampleCode}
-TODO: add the remaining sources.
\ No newline at end of file
+Sometimes we want to trigger activity based on timing, not data. To achieve this, we can use the [`source_interval()`](./surface_ops_gen.md#source_interval) operator, which takes a `Duration` `d` as an argument, and outputs a Tokio time Instant after every `d` units of time pass.
+
+## Destinations
+As duals to our data source operators, we also have data destination operators. The dest operators you'll likely use
+most often are [`dest_sink()`](./surface_ops_gen.md#dest_sink) and [`dest_file()`](./surface_ops_gen.md#dest_file). They are fairly
+straightforward, so the best source for further information is the documentation you can find by following the links on the operator names above.
+
+## SerDe: Network Serialization and Deserialization
+One of the mechanical annoyances of networked systems is the need to convert data to wire format ("serialization") and convert it back from wire format to data ("deserialization"),
+also known as "SerDe".
+This can be done with `map` functions, but we provide a convenience source/sink pair that does serde and networking for you.
+The source side, [`source_serde()`](./surface_ops_gen.md#source_serde) generates tuples of the type `(T, SocketAddr)`,
+where the first field is a deserialized item of type `T`, and the second field is the address of the sender of the item.
+The dest side, [`source_serde()`](./surface_ops_gen.md#source_serde), takes in tuples of type `(T, SocketAddr)`,
+where the first field is an item of type `T` to be serialized, and the second field is a destination address.
+
diff --git a/docs/docs/hydroflow/syntax/surface_flows.md b/docs/docs/hydroflow/syntax/surface_flows.md
index 2f56d55a970..1d55b2c756d 100644
--- a/docs/docs/hydroflow/syntax/surface_flows.md
+++ b/docs/docs/hydroflow/syntax/surface_flows.md
@@ -23,15 +23,15 @@ data, making the program more understandable.
## Operators with Multiple Ports
Some operators have more than one input _port_ that can be referenced by `->`. For example [`union`](./surface_ops_gen.md#union)
-unions the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs, notably [`tee`](./surface_ops_gen.md#tee),
-which has an arbitrary number of outputs.
+unions the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs; [`tee`](./surface_ops_gen.md#tee) and [`demux`](./surface_ops_gen.md#demux)
+have an arbitrary number of outputs.
In the syntax, we optionally distinguish input ports via an _indexing prefix_ string
-in square brackets before the name (e.g. `[0]my_union` and `[1]my_union`). Binary operators ---
-those with two distinct input ports --- require indexing prefixes, and require them to be `0` and `1`.
-Operators with arbitrary numbers of inputs ([`union`](./surface_ops_gen.md#union)) and outputs
+in square brackets before the name (e.g. `[0]my_union` and `[1]my_union`). Most operators with a fixed number of input ports\ require specific indexing prefixes to
+distinguish the inputs. For example, the inputs to [`join`](./surface_ops_gen.md#join) must be `[0]` and `[1]`; the inputs to [`difference`](./surface_ops_gen.md#difference) must be `[pos]` and `[neg]`.
+Operators with an arbitrary number of inputs ([`union`](./surface_ops_gen.md#union)) and outputs
([`demux`](./surface_ops_gen.md#demux), [`tee`](./surface_ops_gen.md#tee))
-allow for arbitrary strings, which can make code and dataflow graphs more readable and understandable
+allow you to choose arbitrary strings, which help you make your code and dataflow graphs more readable and understandable
(e.g. `my_tee[print]` and `my_tee[continue]`).
Here is an example that tees one flow into two, handles each separately, and then unions them to print out the contents in both lowercase and uppercase:
diff --git a/docs/docs/hydroflow/todo.md b/docs/docs/hydroflow/todo.md
index ddac14b7fed..b10714f83fd 100644
--- a/docs/docs/hydroflow/todo.md
+++ b/docs/docs/hydroflow/todo.md
@@ -56,21 +56,23 @@ sidebar_position: 8
- cargo generate for templating
- Hydroflow program specs embedded in Rust
- Tokio Channels and how to use them in Hydroflow
- - Network sources and sinks (source_stream)
- - Built-in serde (source_stream_serde, dest_sink_serde)
+ - Network sources and sinks (`source_stream`)
+ - Built-in serde (`source_stream_serde`, `dest_sink_serde`)
- Hydroflow syntax: operators, ->, variables, indexing multi-input/output operators
- running Hydroflow via `run_available` and `run_async`
- Recursion via cyclic dataflow
- Fixpoints and Strata
- - Template structure: clap, message types
- - source_stdin
+ - Template structure: `clap`, message types
+ - `source_stdin`
- Messages and `demux`
- broadcast pattern
- - gated buffer pattern
- - bootstrapping pipelines
+ - the `persist` operator to store and replay dataflow
+ - the `defer_signal` operator to gate a dataflow
+ - bootstrapping pipelines: `initialize`
- Operators covered
- cross_join
+ - defer_signal
- demux
- dest_sink_serde
- difference
@@ -79,12 +81,15 @@ sidebar_position: 8
- flatten
- flat_map
- for_each
+ - initialize
- join
- map
+ - persist
- union
- source_iter
- source_stdin
- source_stream
- source_stream_serde
- tee
+ - union
- unique
diff --git a/docs/docusaurus.config.js b/docs/docusaurus.config.js
index 1a1e5be7845..2b37bfc6e5d 100644
--- a/docs/docusaurus.config.js
+++ b/docs/docusaurus.config.js
@@ -27,6 +27,10 @@ const config = {
onBrokenLinks: 'throw',
onBrokenMarkdownLinks: 'throw',
+ customFields: {
+ 'LOAD_PLAYGROUND': process.env.LOAD_PLAYGROUND || false,
+ },
+
markdown: {
mermaid: true
},
diff --git a/docs/src/pages/playground.js b/docs/src/pages/playground.js
index e519d3e24ac..684181f3b46 100644
--- a/docs/src/pages/playground.js
+++ b/docs/src/pages/playground.js
@@ -5,27 +5,37 @@ import Editor from "@monaco-editor/react";
import ExecutionEnvironment from '@docusaurus/ExecutionEnvironment';
+import siteConfig from '@generated/docusaurus.config';
+
import * as wasm from "website_playground/website_playground_bg.wasm";
-import { __wbg_set_wasm, init, compile_hydroflow, compile_datalog } from "website_playground/website_playground_bg.js";
+import * as playgroundJS from "website_playground/website_playground_bg.js";
+
+let compile_hydroflow = null;
+let compile_datalog = null;
-if (ExecutionEnvironment.canUseDOM) {
- __wbg_set_wasm(wasm);
-} else {
- const wasmUri = require("website_playground/website_playground_bg.wasm");
- const wasmBuffer = Buffer.from(wasmUri.split(",")[1], 'base64');
- const wasm = new WebAssembly.Module(wasmBuffer);
- const instance = new WebAssembly.Instance(wasm, {
- "./website_playground_bg.js": require("website_playground/website_playground_bg.js")
- });
- __wbg_set_wasm(instance.exports);
+if (siteConfig.customFields.LOAD_PLAYGROUND === '1') {
+ compile_hydroflow = playgroundJS.compile_hydroflow;
+ compile_datalog = playgroundJS.compile_datalog;
+
+ if (ExecutionEnvironment.canUseDOM) {
+ playgroundJS.__wbg_set_wasm(wasm);
+ } else {
+ const wasmUri = require("website_playground/website_playground_bg.wasm");
+ const wasmBuffer = Buffer.from(wasmUri.split(",")[1], 'base64');
+ const wasm = new WebAssembly.Module(wasmBuffer);
+ const instance = new WebAssembly.Instance(wasm, {
+ "./website_playground_bg.js": require("website_playground/website_playground_bg.js")
+ });
+ playgroundJS.__wbg_set_wasm(instance.exports);
+ }
+
+ playgroundJS.init();
}
import mermaid from "mermaid";
import styles from "./playground.module.css";
-init();
-
function MermaidGraph({ id, source }) {
const [svg, setSvg] = useState({ __html: 'Loading Mermaid graph...' });
useEffect(() => {
@@ -50,7 +60,7 @@ source_iter(0..10) -> for_each(|n| println!("Hello {}", n));`,
// https://hydro.run/docs/hydroflow/quickstart/example_2_simple
source_iter(0..10)
-> map(|n| n * n)
- -> filter(|&n| n > 10)
+ -> filter(|n| *n > 10)
-> map(|n| (n..=n+1))
-> flatten()
-> for_each(|n| println!("Howdy {}", n));`,
@@ -179,6 +189,10 @@ export function EditorDemo({ compileFn, examples, mermaidId }) {
const [showingMermaid, setShowingMermaid] = useState(true);
const [editorAndMonaco, setEditorAndMonaco] = useState(null);
+ if (siteConfig.customFields.LOAD_PLAYGROUND !== '1') {
+ return Please set LOAD_PLAYGROUND environment variable to 1 to enable the playground.
;
+ }
+
const { output, diagnostics } = (compileFn)(program);
const numberOfLines = program.split("\n").length;
diff --git a/docs/wasm-plugin.js b/docs/wasm-plugin.js
index a3dd8c8b945..16181e5850e 100644
--- a/docs/wasm-plugin.js
+++ b/docs/wasm-plugin.js
@@ -14,7 +14,15 @@ module.exports = function (context, options) {
type: "asset/inline",
},
] : []
- }
+ },
+ ...(process.env.LOAD_PLAYGROUND !== "1" ? {
+ resolve: {
+ alias: {
+ "website_playground/website_playground_bg.wasm": false,
+ "website_playground/website_playground_bg.js": false
+ }
+ }
+ } : {})
};
},
};
diff --git a/hydro_cli/CHANGELOG.md b/hydro_cli/CHANGELOG.md
index 387ff162005..750b836ec7f 100644
--- a/hydro_cli/CHANGELOG.md
+++ b/hydro_cli/CHANGELOG.md
@@ -1,7 +1,69 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+
+## 0.4.0 (2023-08-15)
+
+### Chore
+
+ - fix lints for latest nightly
+
+### Commit Statistics
+
+
+
+ - 1 commit contributed to the release over the course of 27 calendar days.
+ - 42 days passed between releases.
+ - 1 commit was understood as [conventional](https://www.conventionalcommits.org).
+ - 1 unique issue was worked on: [#844](https://github.com/hydro-project/hydroflow/issues/844)
+
+### Commit Details
+
+
+
+view details
+
+ * **[#844](https://github.com/hydro-project/hydroflow/issues/844)**
+ - Fix lints for latest nightly ([`949db02`](https://github.com/hydro-project/hydroflow/commit/949db02e9fa9878e1a7176c180d6f44c5cddf052))
+
+
+## 0.3.0 (2023-07-04)
+
+
+
+Unchanged from previous release.
+
+### Chore
+
+ - mark hydro_cli as unchanged for 0.3 release
+
+### Commit Statistics
+
+
+
+ - 2 commits contributed to the release.
+ - 33 days passed between releases.
+ - 1 commit was understood as [conventional](https://www.conventionalcommits.org).
+ - 0 issues like '(#ID)' were seen in commit messages
+
+### Commit Details
+
+
+
+view details
+
+ * **Uncategorized**
+ - Release hydroflow_cli_integration v0.3.0, hydroflow_lang v0.3.0, hydroflow_datalog_core v0.3.0, hydroflow_datalog v0.3.0, hydroflow_macro v0.3.0, lattices v0.3.0, pusherator v0.0.2, hydroflow v0.3.0, hydro_cli v0.3.0, safety bump 5 crates ([`ec9633e`](https://github.com/hydro-project/hydroflow/commit/ec9633e2e393c2bf106223abeb0b680200fbdf84))
+ - Mark hydro_cli as unchanged for 0.3 release ([`4c2cf81`](https://github.com/hydro-project/hydroflow/commit/4c2cf81411835529b5d7daa35717834e46e28b9b))
+
## v0.2.0 (2023-05-31)
+
+
### Chore
- manually bump versions for v0.2.0 release
@@ -14,7 +76,7 @@
- - 2 commits contributed to the release.
+ - 3 commits contributed to the release.
- 1 day passed between releases.
- 2 commits were understood as [conventional](https://www.conventionalcommits.org).
- 1 unique issue was worked on: [#723](https://github.com/hydro-project/hydroflow/issues/723)
@@ -28,6 +90,7 @@
* **[#723](https://github.com/hydro-project/hydroflow/issues/723)**
- Add more detailed Hydro Deploy docs and rename `ConnectedBidi` => `ConnectedDirect` ([`8b2c9f0`](https://github.com/hydro-project/hydroflow/commit/8b2c9f09b1f423ac6d562c29d4ea587578f1c98a))
* **Uncategorized**
+ - Release hydroflow_lang v0.2.0, hydroflow_datalog_core v0.2.0, hydroflow_datalog v0.2.0, hydroflow_macro v0.2.0, lattices v0.2.0, hydroflow v0.2.0, hydro_cli v0.2.0 ([`ca464c3`](https://github.com/hydro-project/hydroflow/commit/ca464c32322a7ad39eb53e1794777c849aa548a0))
- Manually bump versions for v0.2.0 release ([`fd896fb`](https://github.com/hydro-project/hydroflow/commit/fd896fbe925fbd8ef1d16be7206ac20ba585081a))
@@ -41,6 +104,8 @@
+
+
### Chore
@@ -73,7 +138,7 @@
- initialize hydro_cli/CHANGELOG.md
- publish hydro_cli
- Will bump versions for python deploy.
+ Will bump versions for python deploy.
Update build-cli.yml to publish on hydro_cli release
### Style
diff --git a/hydro_cli/Cargo.toml b/hydro_cli/Cargo.toml
index d1c173a2051..125aed7f3b0 100644
--- a/hydro_cli/Cargo.toml
+++ b/hydro_cli/Cargo.toml
@@ -1,7 +1,7 @@
[package]
name = "hydro_cli"
publish = true
-version = "0.2.0"
+version = "0.4.0"
edition = "2021"
license = "Apache-2.0"
documentation = "https://docs.rs/hydro_cli/"
@@ -37,8 +37,8 @@ bytes = "1.1.0"
nanoid = "0.4.0"
ctrlc = "3.2.5"
nix = "0.26.2"
-hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.2.0" }
-indicatif = "0.17.3"
+hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" }
+indicatif = "0.17.6"
cargo_metadata = "0.15.4"
[dev-dependencies]
diff --git a/hydro_cli/hydro/_core.pyi b/hydro_cli/hydro/_core.pyi
index 7c0f491c9c7..5f561ce846a 100644
--- a/hydro_cli/hydro/_core.pyi
+++ b/hydro_cli/hydro/_core.pyi
@@ -20,7 +20,7 @@ class Deployment(object):
def CustomService(self, on: "Host", external_ports: List[int]) -> "CustomService": ...
- def HydroflowCrate(self, src: str, on: "Host", example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...
+ def HydroflowCrate(self, src: str, on: "Host", bin: Optional[str] = None, example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...
async def deploy(self): ...
diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs
index 98cf030ca25..a55eecb4d70 100644
--- a/hydro_cli/src/core/custom_service.rs
+++ b/hydro_cli/src/core/custom_service.rs
@@ -65,7 +65,9 @@ impl Service for CustomService {
Ok(())
}
- async fn start(&mut self) {}
+ async fn start(&mut self) -> Result<()> {
+ Ok(())
+ }
async fn stop(&mut self) -> Result<()> {
Ok(())
diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs
index 331470c7031..11390a27467 100644
--- a/hydro_cli/src/core/deployment.rs
+++ b/hydro_cli/src/core/deployment.rs
@@ -17,7 +17,7 @@ pub struct Deployment {
impl Deployment {
pub async fn deploy(&mut self) -> Result<()> {
- progress::ProgressTracker::with_group("deploy", || async {
+ progress::ProgressTracker::with_group("deploy", None, || async {
let mut resource_batch = super::ResourceBatch::new();
let active_services = self
.services
@@ -41,7 +41,7 @@ impl Deployment {
}
let result = Arc::new(
- progress::ProgressTracker::with_group("provision", || async {
+ progress::ProgressTracker::with_group("provision", None, || async {
resource_batch
.provision(&mut self.resource_pool, self.last_resource_result.clone())
.await
@@ -50,7 +50,7 @@ impl Deployment {
);
self.last_resource_result = Some(result.clone());
- progress::ProgressTracker::with_group("provision", || {
+ progress::ProgressTracker::with_group("provision", None, || {
let hosts_provisioned =
self.hosts
.iter_mut()
@@ -61,7 +61,7 @@ impl Deployment {
})
.await;
- progress::ProgressTracker::with_group("deploy", || {
+ progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
@@ -79,7 +79,7 @@ impl Deployment {
})
.await;
- progress::ProgressTracker::with_group("ready", || {
+ progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
self.services
.iter()
@@ -97,7 +97,7 @@ impl Deployment {
.await
}
- pub async fn start(&mut self) {
+ pub async fn start(&mut self) -> Result<()> {
let active_services = self
.services
.iter()
@@ -110,10 +110,12 @@ impl Deployment {
self.services
.iter()
.map(|service: &Weak>| async {
- service.upgrade().unwrap().write().await.start().await;
+ service.upgrade().unwrap().write().await.start().await?;
+ Ok(()) as Result<()>
});
- futures::future::join_all(all_services_start).await;
+ futures::future::try_join_all(all_services_start).await?;
+ Ok(())
}
pub fn add_host T>(
diff --git a/hydro_cli/src/core/gcp.rs b/hydro_cli/src/core/gcp.rs
index aabbc066d9e..1dd250f838b 100644
--- a/hydro_cli/src/core/gcp.rs
+++ b/hydro_cli/src/core/gcp.rs
@@ -92,35 +92,44 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
22,
);
- let res = ProgressTracker::leaf(
- format!(
- "connecting to host @ {}",
- self.external_ip.as_ref().unwrap()
- ),
- async_retry(
- &|| async {
- let mut config = SessionConfiguration::new();
- config.set_compress(true);
-
- let mut session =
- AsyncSession::::connect(target_addr, Some(config)).await?;
-
- session.handshake().await?;
-
- session
- .userauth_pubkey_file(
- self.user.as_str(),
- None,
- self.ssh_key_path().as_path(),
- None,
- )
- .await?;
-
- Ok(session)
- },
- 10,
- Duration::from_secs(1),
- ),
+ let mut attempt_count = 0;
+
+ let res = async_retry(
+ || {
+ attempt_count += 1;
+ ProgressTracker::leaf(
+ format!(
+ "connecting to host @ {} (attempt: {})",
+ self.external_ip.as_ref().unwrap(),
+ attempt_count
+ ),
+ async {
+ let mut config = SessionConfiguration::new();
+ config.set_compress(true);
+
+ let mut session =
+ AsyncSession::::connect(target_addr, Some(config)).await?;
+
+ tokio::time::timeout(Duration::from_secs(15), async move {
+ session.handshake().await?;
+
+ session
+ .userauth_pubkey_file(
+ self.user.as_str(),
+ None,
+ self.ssh_key_path().as_path(),
+ None,
+ )
+ .await?;
+
+ Ok(session)
+ })
+ .await?
+ },
+ )
+ },
+ 10,
+ Duration::from_secs(1),
)
.await?;
diff --git a/hydro_cli/src/core/hydroflow_crate/build.rs b/hydro_cli/src/core/hydroflow_crate/build.rs
index 5833299afdc..e837566c1a3 100644
--- a/hydro_cli/src/core/hydroflow_crate/build.rs
+++ b/hydro_cli/src/core/hydroflow_crate/build.rs
@@ -16,6 +16,7 @@ type CacheKey = (
PathBuf,
Option,
Option,
+ Option,
HostTargetType,
Option>,
);
@@ -27,6 +28,7 @@ static BUILDS: Lazy>>>> =
pub async fn build_crate(
src: PathBuf,
+ bin: Option,
example: Option,
profile: Option,
target_type: HostTargetType,
@@ -34,6 +36,7 @@ pub async fn build_crate(
) -> Result {
let key = (
src.clone(),
+ bin.clone(),
example.clone(),
profile.clone(),
target_type,
@@ -56,6 +59,10 @@ pub async fn build_crate(
profile.unwrap_or("release".to_string()),
]);
+ if let Some(bin) = bin.as_ref() {
+ command.args(["--bin", bin]);
+ }
+
if let Some(example) = example.as_ref() {
command.args(["--example", example]);
}
diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs
index 7b379c4ffaf..19f1d6f034e 100644
--- a/hydro_cli/src/core/hydroflow_crate/mod.rs
+++ b/hydro_cli/src/core/hydroflow_crate/mod.rs
@@ -24,6 +24,7 @@ pub struct HydroflowCrate {
id: usize,
src: PathBuf,
on: Arc>,
+ bin: Option,
example: Option,
profile: Option,
features: Option>,
@@ -55,6 +56,7 @@ impl HydroflowCrate {
id: usize,
src: PathBuf,
on: Arc>,
+ bin: Option,
example: Option,
profile: Option,
features: Option>,
@@ -65,6 +67,7 @@ impl HydroflowCrate {
Self {
id,
src,
+ bin,
on,
example,
profile,
@@ -162,6 +165,7 @@ impl HydroflowCrate {
fn build(&mut self) -> JoinHandle> {
let src_cloned = self.src.canonicalize().unwrap();
+ let bin_cloned = self.bin.clone();
let example_cloned = self.example.clone();
let features_cloned = self.features.clone();
let host = self.on.clone();
@@ -170,6 +174,7 @@ impl HydroflowCrate {
tokio::task::spawn(build_crate(
src_cloned,
+ bin_cloned,
example_cloned,
profile_cloned,
target_type,
@@ -213,6 +218,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
+ None,
|| async {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
@@ -232,6 +238,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
+ None,
|| async {
let launched_host = self.launched_host.as_ref().unwrap();
@@ -256,7 +263,7 @@ impl Service for HydroflowCrate {
let formatted_bind_config = serde_json::to_string(&bind_config).unwrap();
// request stdout before sending config so we don't miss the "ready" response
- let stdout_receiver = binary.write().await.stdout().await;
+ let stdout_receiver = binary.write().await.cli_stdout().await;
binary
.write()
@@ -286,9 +293,9 @@ impl Service for HydroflowCrate {
.await
}
- async fn start(&mut self) {
+ async fn start(&mut self) -> Result<()> {
if self.started {
- return;
+ return Ok(());
}
let mut sink_ports = HashMap::new();
@@ -298,6 +305,15 @@ impl Service for HydroflowCrate {
let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
+ let stdout_receiver = self
+ .launched_binary
+ .as_mut()
+ .unwrap()
+ .write()
+ .await
+ .cli_stdout()
+ .await;
+
self.launched_binary
.as_mut()
.unwrap()
@@ -309,7 +325,17 @@ impl Service for HydroflowCrate {
.await
.unwrap();
+ let start_ack_line = ProgressTracker::leaf(
+ "waiting for ack start".to_string(),
+ tokio::time::timeout(Duration::from_secs(60), stdout_receiver.recv()),
+ )
+ .await??;
+ if !start_ack_line.starts_with("ack start") {
+ bail!("expected ack start");
+ }
+
self.started = true;
+ Ok(())
}
async fn stop(&mut self) -> Result<()> {
diff --git a/hydro_cli/src/core/hydroflow_crate/ports.rs b/hydro_cli/src/core/hydroflow_crate/ports.rs
index a937570a81c..a025933455a 100644
--- a/hydro_cli/src/core/hydroflow_crate/ports.rs
+++ b/hydro_cli/src/core/hydroflow_crate/ports.rs
@@ -544,7 +544,6 @@ impl ServerConfig {
ServerConfig::TaggedUnwrap(underlying) => {
let loaded = underlying.load_instantiated(select).await;
- dbg!(&loaded);
if let ServerPort::Tagged(underlying, _) = loaded {
*underlying
} else {
diff --git a/hydro_cli/src/core/localhost.rs b/hydro_cli/src/core/localhost.rs
index fa565874fe7..512d2b3211c 100644
--- a/hydro_cli/src/core/localhost.rs
+++ b/hydro_cli/src/core/localhost.rs
@@ -22,6 +22,7 @@ use super::{
struct LaunchedLocalhostBinary {
child: RwLock,
stdin_sender: Sender,
+ stdout_cli_receivers: Arc>>>,
stdout_receivers: Arc>>>,
stderr_receivers: Arc>>>,
}
@@ -32,6 +33,13 @@ impl LaunchedBinary for LaunchedLocalhostBinary {
self.stdin_sender.clone()
}
+ async fn cli_stdout(&self) -> Receiver {
+ let mut receivers = self.stdout_cli_receivers.write().await;
+ let (sender, receiver) = async_channel::unbounded::();
+ receivers.push(sender);
+ receiver
+ }
+
async fn stdout(&self) -> Receiver {
let mut receivers = self.stdout_receivers.write().await;
let (sender, receiver) = async_channel::unbounded::();
@@ -72,14 +80,34 @@ struct LaunchedLocalhost {}
pub fn create_broadcast(
source: T,
default: impl Fn(String) + Send + 'static,
-) -> Arc>>> {
+) -> (
+ Arc>>>,
+ Arc>>>,
+) {
+ let cli_receivers = Arc::new(RwLock::new(Vec::>::new()));
let receivers = Arc::new(RwLock::new(Vec::>::new()));
+
+ let weak_cli_receivers = Arc::downgrade(&cli_receivers);
let weak_receivers = Arc::downgrade(&receivers);
tokio::spawn(async move {
let mut lines = BufReader::new(source).lines();
- while let Some(Result::Ok(line)) = lines.next().await {
+ 'line_loop: while let Some(Result::Ok(line)) = lines.next().await {
+ if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
+ let mut cli_receivers = cli_receivers.write().await;
+ let mut successful_send = false;
+ for r in cli_receivers.iter() {
+ successful_send |= r.send(line.clone()).await.is_ok();
+ }
+
+ cli_receivers.retain(|r| !r.is_closed());
+
+ if successful_send {
+ continue 'line_loop;
+ }
+ }
+
if let Some(receivers) = weak_receivers.upgrade() {
let mut receivers = receivers.write().await;
let mut successful_send = false;
@@ -98,7 +126,7 @@ pub fn create_broadcast(
}
});
- receivers
+ (cli_receivers, receivers)
}
#[async_trait]
@@ -158,16 +186,18 @@ impl LaunchedHost for LaunchedLocalhost {
});
let id_clone = id.clone();
- let stdout_receivers = create_broadcast(child.stdout.take().unwrap(), move |s| {
- println!("[{id_clone}] {s}")
- });
- let stderr_receivers = create_broadcast(child.stderr.take().unwrap(), move |s| {
+ let (stdout_cli_receivers, stdout_receivers) =
+ create_broadcast(child.stdout.take().unwrap(), move |s| {
+ println!("[{id_clone}] {s}")
+ });
+ let (_, stderr_receivers) = create_broadcast(child.stderr.take().unwrap(), move |s| {
eprintln!("[{id}] {s}")
});
Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary {
child: RwLock::new(child),
stdin_sender,
+ stdout_cli_receivers,
stdout_receivers,
stderr_receivers,
})))
diff --git a/hydro_cli/src/core/mod.rs b/hydro_cli/src/core/mod.rs
index c28d1f0fe43..66c82107e7c 100644
--- a/hydro_cli/src/core/mod.rs
+++ b/hydro_cli/src/core/mod.rs
@@ -69,6 +69,7 @@ pub struct ResourceResult {
#[async_trait]
pub trait LaunchedBinary: Send + Sync {
async fn stdin(&self) -> Sender;
+ async fn cli_stdout(&self) -> Receiver;
async fn stdout(&self) -> Receiver;
async fn stderr(&self) -> Receiver;
@@ -186,7 +187,7 @@ pub trait Service: Send + Sync {
async fn ready(&mut self) -> Result<()>;
/// Starts the service by having it connect to other services and start computations.
- async fn start(&mut self);
+ async fn start(&mut self) -> Result<()>;
/// Stops the service by having it disconnect from other services and stop computations.
async fn stop(&mut self) -> Result<()>;
diff --git a/hydro_cli/src/core/progress.rs b/hydro_cli/src/core/progress.rs
index 05a7d0d8df3..153f4d1b417 100644
--- a/hydro_cli/src/core/progress.rs
+++ b/hydro_cli/src/core/progress.rs
@@ -12,7 +12,6 @@ tokio::task_local! {
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum LeafStatus {
- Queued,
Started,
Finished,
}
@@ -20,7 +19,12 @@ pub enum LeafStatus {
#[derive(Debug)]
pub enum BarTree {
Root(Vec),
- Group(String, Arc, Vec),
+ Group(
+ String,
+ Arc,
+ Vec,
+ Option,
+ ),
Leaf(String, Arc, LeafStatus),
Finished,
}
@@ -29,26 +33,22 @@ impl BarTree {
fn get_pb(&self) -> Option<&Arc> {
match self {
BarTree::Root(_) => None,
- BarTree::Group(_, pb, _) | BarTree::Leaf(_, pb, _) => Some(pb),
+ BarTree::Group(_, pb, _, _) | BarTree::Leaf(_, pb, _) => Some(pb),
BarTree::Finished => None,
}
}
fn status(&self) -> LeafStatus {
match self {
- BarTree::Root(children) | BarTree::Group(_, _, children) => {
- if children
- .iter()
- .all(|child| child.status() == LeafStatus::Finished)
+ BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
+ if children.len() > 0
+ && children
+ .iter()
+ .all(|child| child.status() == LeafStatus::Finished)
{
LeafStatus::Finished
- } else if children
- .iter()
- .any(|child| child.status() == LeafStatus::Started)
- {
- LeafStatus::Started
} else {
- LeafStatus::Queued
+ LeafStatus::Started
}
}
BarTree::Leaf(_, _, status) => status.clone(),
@@ -63,7 +63,7 @@ impl BarTree {
child.refresh_prefix(cur_path);
}
}
- BarTree::Group(name, pb, children) => {
+ BarTree::Group(name, pb, children, anticipated_total) => {
let mut path_with_group = cur_path.to_vec();
path_with_group.push(name.clone());
@@ -75,18 +75,26 @@ impl BarTree {
.iter()
.filter(|child| child.status() == LeafStatus::Started)
.count();
- let queued_count = children
- .iter()
- .filter(|child| child.status() == LeafStatus::Queued)
- .count();
-
- pb.set_prefix(format!(
- "{} ({}/{}/{})",
- path_with_group.join(" / "),
- finished_count,
- started_count,
- queued_count
- ));
+ let queued_count =
+ anticipated_total.map(|total| total - finished_count - started_count);
+
+ match queued_count {
+ Some(queued_count) => {
+ pb.set_prefix(format!(
+ "{} ({}/{}/{})",
+ path_with_group.join(" / "),
+ finished_count,
+ started_count,
+ queued_count
+ ));
+ }
+ None => pb.set_prefix(format!(
+ "{} ({}/{})",
+ path_with_group.join(" / "),
+ finished_count,
+ started_count
+ )),
+ }
for child in children {
child.refresh_prefix(&path_with_group);
}
@@ -106,7 +114,7 @@ impl BarTree {
}
match self {
- BarTree::Root(children) | BarTree::Group(_, _, children) => {
+ BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
children[path[0]].find_node(&path[1..])
}
_ => panic!(),
@@ -134,12 +142,13 @@ impl ProgressTracker {
under_path: Vec,
name: String,
group: bool,
+ anticipated_total: Option,
progress: bool,
) -> (usize, Arc) {
let surrounding = self.tree.find_node(&under_path);
let (surrounding_children, surrounding_pb) = match surrounding {
BarTree::Root(children) => (children, None),
- BarTree::Group(_, pb, children) => (children, Some(pb)),
+ BarTree::Group(_, pb, children, _) => (children, Some(pb)),
_ => panic!(),
};
@@ -161,7 +170,7 @@ impl ProgressTracker {
let pb = Arc::new(created_bar);
if group {
- surrounding_children.push(BarTree::Group(name, pb.clone(), vec![]));
+ surrounding_children.push(BarTree::Group(name, pb.clone(), vec![], anticipated_total));
} else {
surrounding_children.push(BarTree::Leaf(name, pb.clone(), LeafStatus::Started));
}
@@ -189,7 +198,7 @@ impl ProgressTracker {
pub fn end_task(&mut self, path: Vec) {
match self.tree.find_node(&path[0..path.len() - 1]) {
- BarTree::Root(children) | BarTree::Group(_, _, children) => {
+ BarTree::Root(children) | BarTree::Group(_, _, children, _) => {
let removed = children[*path.last().unwrap()].get_pb().unwrap().clone();
children[*path.last().unwrap()] = BarTree::Finished;
self.multi_progress.remove(&removed);
@@ -213,11 +222,15 @@ impl ProgressTracker {
.get_or_init(|| Mutex::new(ProgressTracker::new()))
.lock()
.unwrap();
- progress_bar.multi_progress.println(msg).unwrap();
+
+ if progress_bar.multi_progress.println(msg).is_err() {
+ println!("{}", msg);
+ }
}
pub fn with_group<'a, T, F: Future