diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ff73a39ef0a..9538a1ec3cb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -87,7 +87,7 @@ jobs: --bump ${{ inputs.bump }} --bump-dependencies auto ${{ inputs.execute && '--execute' || '--no-publish' }} hydroflow hydroflow_lang hydroflow_macro hydroflow_plus - hydroflow_datalog hydroflow_datalog_core + hydroflow_plus_std hydroflow_datalog hydroflow_datalog_core hydro_deploy hydro_cli hydroflow_deploy_integration stageleft stageleft_macro stageleft_tool multiplatform_test diff --git a/Cargo.lock b/Cargo.lock index 396a800a5c2..d26a114e4e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1607,6 +1607,19 @@ dependencies = [ "trybuild-internals-api", ] +[[package]] +name = "hydroflow_plus_std" +version = "0.10.0" +dependencies = [ + "async-ssh2-lite", + "ctor", + "hydro_deploy", + "hydroflow_plus", + "insta", + "stageleft", + "stageleft_tool", +] + [[package]] name = "hydroflow_plus_test" version = "0.0.0" @@ -1615,6 +1628,7 @@ dependencies = [ "futures", "hydro_deploy", "hydroflow_plus", + "hydroflow_plus_std", "insta", "rand", "serde", diff --git a/Cargo.toml b/Cargo.toml index 224bad27797..23491ccfe4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "hydroflow_lang", "hydroflow_macro", "hydroflow_plus", + "hydroflow_plus_std", "hydroflow_plus_test", "hydroflow_plus_test_local", "hydroflow_plus_test_local_macro", diff --git a/hydroflow_plus_std/CHANGELOG.md b/hydroflow_plus_std/CHANGELOG.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/hydroflow_plus_std/Cargo.toml b/hydroflow_plus_std/Cargo.toml new file mode 100644 index 00000000000..03992c88f27 --- /dev/null +++ b/hydroflow_plus_std/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "hydroflow_plus_std" +publish = true +version = "0.10.0" +edition = "2021" +license = "Apache-2.0" +documentation = "https://docs.rs/hydroflow_plus_std/" +description = "Standard library of distributed systems building blocks for Hydroflow+" + +[lints] +workspace = true + +[lib] +path = "src/lib.rs" + +[dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0", default-features = false } +stageleft = { path = "../stageleft", version = "^0.5.0" } + +[build-dependencies] +stageleft_tool = { path = "../stageleft_tool", version = "^0.4.0" } + +[dev-dependencies] +hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +insta = "1.39" +hydro_deploy = { path = "../hydro_deploy/core", version = "^0.10.0" } +async-ssh2-lite = { version = "0.5.0", features = ["vendored-openssl"] } +ctor = "0.2.8" diff --git a/hydroflow_plus_std/build.rs b/hydroflow_plus_std/build.rs new file mode 100644 index 00000000000..99775c3c7da --- /dev/null +++ b/hydroflow_plus_std/build.rs @@ -0,0 +1,3 @@ +fn main() { + stageleft_tool::gen_final!(); +} diff --git a/hydroflow_plus_std/src/lib.rs b/hydroflow_plus_std/src/lib.rs new file mode 100644 index 00000000000..fc5a2ab71fb --- /dev/null +++ b/hydroflow_plus_std/src/lib.rs @@ -0,0 +1,13 @@ +stageleft::stageleft_no_entry_crate!(); + +pub mod quorum; +pub mod request_response; + +#[stageleft::runtime] +#[cfg(test)] +mod tests { + #[ctor::ctor] + fn init() { + hydroflow_plus::deploy::init_test(); + } +} diff --git a/hydroflow_plus_test/src/cluster/quorum.rs b/hydroflow_plus_std/src/quorum.rs similarity index 100% rename from hydroflow_plus_test/src/cluster/quorum.rs rename to hydroflow_plus_std/src/quorum.rs diff --git a/hydroflow_plus_test/src/cluster/request_response.rs b/hydroflow_plus_std/src/request_response.rs similarity index 100% rename from hydroflow_plus_test/src/cluster/request_response.rs rename to hydroflow_plus_std/src/request_response.rs diff --git a/hydroflow_plus_test/Cargo.toml b/hydroflow_plus_test/Cargo.toml index 6d57e17ebbe..aabdaf528a2 100644 --- a/hydroflow_plus_test/Cargo.toml +++ b/hydroflow_plus_test/Cargo.toml @@ -13,6 +13,7 @@ stageleft_devel = [] [dependencies] hydroflow_plus = { path = "../hydroflow_plus", version = "^0.10.0" } +hydroflow_plus_std = { path = "../hydroflow_plus_std", version = "^0.10.0" } tokio = { version = "1.29.0", features = [ "full" ] } stageleft = { path = "../stageleft", version = "^0.5.0" } rand = "0.8.0" diff --git a/hydroflow_plus_test/src/cluster/mod.rs b/hydroflow_plus_test/src/cluster/mod.rs index 88bcbe2d949..b4e5be9bb00 100644 --- a/hydroflow_plus_test/src/cluster/mod.rs +++ b/hydroflow_plus_test/src/cluster/mod.rs @@ -4,7 +4,5 @@ pub mod map_reduce; pub mod paxos; pub mod paxos_bench; pub mod paxos_kv; -pub mod quorum; -pub mod request_response; pub mod simple_cluster; pub mod two_pc; diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index af3a3ca04f3..66b28264aba 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -4,12 +4,11 @@ use std::hash::Hash; use std::time::Duration; use hydroflow_plus::*; +use hydroflow_plus_std::quorum::{collect_quorum, collect_quorum_with_response}; +use hydroflow_plus_std::request_response::join_responses; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use super::quorum::{collect_quorum, collect_quorum_with_response}; -use super::request_response::join_responses; - pub struct Proposer {} pub struct Acceptor {} diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index 4d87409fbde..7517bd2cc94 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -3,11 +3,11 @@ use std::rc::Rc; use std::time::Duration; use hydroflow_plus::*; +use hydroflow_plus_std::quorum::collect_quorum; use tokio::time::Instant; use super::paxos::{Acceptor, Ballot, Proposer}; use super::paxos_kv::{paxos_kv, KvPayload, Replica}; -use super::quorum::collect_quorum; pub struct Client {} diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 30433787404..c2c3c9e7119 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -220,11 +220,11 @@ expression: built.ir() input: DeferTick( Difference( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -426,7 +426,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -483,14 +483,14 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: vec :: Vec < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | logs , log | { logs . push (log) ; } }), input: Persist( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > >) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (v) => Some ((key , v)) , Err (_) => None , } }), input: AntiJoin( AntiJoin( Tee { inner: , }, FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , (usize , usize)) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: Ballot > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success < min__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -552,7 +552,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , core :: result :: Result < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < (hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -745,11 +745,11 @@ expression: built.ir() Difference( Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: : Chain( CycleSource { @@ -921,7 +921,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , usize)) , core :: option :: Option < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let max__free = 3usize ; move | (key , (success , error)) | if (success + error) >= max__free { Some (key) } else { None } }), input: Tee { inner: , }, @@ -982,7 +982,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use crate :: __staged :: cluster :: request_response :: * ; | (key , _) | key }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) , (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , _) | key }), input: Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , ()) > ({ use crate :: __staged :: cluster :: paxos :: * ; | k | (k , ()) }), @@ -1129,7 +1129,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , ballot) | ballot }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use crate :: __staged :: cluster :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: result :: Result < () , hydroflow_plus_test :: cluster :: paxos :: Ballot >) , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | (key , res) | match res { Ok (_) => None , Err (e) => Some ((key , e)) , } }), input: Tee { inner: , }, @@ -1194,7 +1194,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((slot , _ballot) , (value , _)) | (slot , value) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use crate :: __staged :: cluster :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydroflow_plus :: location :: cluster :: cluster_id :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , u32) > > , ())) > ({ use hydroflow_plus_std :: __staged :: request_response :: * ; | (key , (meta , resp)) | (key , (meta , resp)) }), input: Join( Tee { inner: , @@ -1526,11 +1526,11 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use crate :: __staged :: cluster :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((u32 , u32) , (usize , usize)) , core :: option :: Option < (u32 , u32) > > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; let min__free = 2usize ; move | (key , (success , _error)) | if success >= min__free { Some (key) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use crate :: __staged :: cluster :: quorum :: * ; move | | (0 , 0) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use crate :: __staged :: cluster :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , usize) > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | | (0 , 0) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , usize) , core :: result :: Result < () , () > , () > ({ use hydroflow_plus_std :: __staged :: quorum :: * ; move | accum , value | { if value . is_ok () { accum . 0 += 1 ; } else { accum . 1 += 1 ; } } }), input: Tee { inner: , }, diff --git a/hydroflow_plus_test/src/cluster/two_pc.rs b/hydroflow_plus_test/src/cluster/two_pc.rs index 4a5f8220cb6..429de9ab633 100644 --- a/hydroflow_plus_test/src/cluster/two_pc.rs +++ b/hydroflow_plus_test/src/cluster/two_pc.rs @@ -1,6 +1,5 @@ use hydroflow_plus::*; - -use super::quorum::collect_quorum; +use hydroflow_plus_std::quorum::collect_quorum; // if the variable start with p, that means current work is at the participant side. if start with c, at coordinator side. //