diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs
index 8c3c89a2a1b4..5af3d1a62fdc 100644
--- a/hydroflow_plus_test/src/cluster/paxos.rs
+++ b/hydroflow_plus_test/src/cluster/paxos.rs
@@ -58,8 +58,8 @@ struct P2a
{
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2b
{
- victory: bool,
ballot: Ballot,
+ max_ballot: Ballot,
slot: i32,
value: P,
}
@@ -111,7 +111,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
- a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.ballot)),
+ a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.max_ballot)),
a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
);
@@ -621,14 +621,13 @@ fn p_p2a<'a, P: PaxosPayload>(
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
- .unwrap_or(proposers.singleton_each_tick(q!(-1)))
+ .map(q!(|max_slot| max_slot + 1))
+ .unwrap_or(proposers.singleton_each_tick(q!(0)))
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
- .continue_unless(p_next_slot.clone())
- .map(q!(|max_slot| max_slot + 1));
+ .continue_unless(p_next_slot.clone());
// Send p2as
let p_indexed_payloads = c_to_proposers
- .clone()
.tick_batch()
.enumerate()
.cross_singleton(p_next_slot.clone())
@@ -638,36 +637,25 @@ fn p_p2a<'a, P: PaxosPayload>(
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
- .union(p_indexed_payloads)
+ .union(p_indexed_payloads.clone())
.continue_if(p_is_leader.clone())
.all_ticks()
.broadcast_bincode_interleaved(acceptors);
- let p_num_payloads = c_to_proposers.clone().tick_batch().count();
- let p_exists_payloads = p_num_payloads
- .clone()
- .filter(q!(|num_payloads| *num_payloads > 0));
+ let p_num_payloads = p_indexed_payloads.count();
let p_next_slot_after_sending_payloads = p_num_payloads
- .continue_if(p_exists_payloads.clone())
.clone()
.cross_singleton(p_next_slot.clone())
.map(q!(
|(num_payloads, next_slot)| next_slot + num_payloads as i32
));
- let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads);
- let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs
+
+ let p_new_next_slot = p_next_slot_after_reconciling_p1bs
// .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot)))
.union(p_next_slot_after_sending_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot))))
- .union(p_next_slot_if_no_payloads)
- // .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot))))
.continue_if(p_is_leader.clone());
- let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all
- .clone()
- .continue_unless(p_new_next_slot_calculated.clone())
- .map(q!(|_| 0));
- // .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot)));
- let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default);
+
p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot);
(p_next_slot, p_to_acceptors_p2a)
}
@@ -767,8 +755,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.map(q!(|(p2a, max_ballot)| (
p2a.ballot.proposer_id,
P2b {
- victory: p2a.ballot == max_ballot,
ballot: p2a.ballot,
+ max_ballot,
slot: p2a.slot,
value: p2a.value
}
@@ -785,13 +773,10 @@ fn p_p2b<'a, P: PaxosPayload>(
) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
- let p_p2b = a_to_proposers_p2b
- .clone()
- .tick_batch()
- .union(p_persisted_p2bs);
+ let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
let p_count_matching_p2bs = p_p2b
.clone()
- .filter_map(q!(|p2b| if p2b.victory {
+ .filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot {
// Only consider p2bs where max ballot = ballot, which means that no one preempted us
Some(((p2b.slot, p2b.ballot), p2b.value))
} else {
@@ -800,7 +785,6 @@ fn p_p2b<'a, P: PaxosPayload>(
.fold_keyed(
q!(|| (0, Default::default())),
q!(|accum, value| {
- // TODO(shadaj): why is sender unused? should we de-dup?
accum.0 += 1;
accum.1 = value;
}),
@@ -816,7 +800,6 @@ fn p_p2b<'a, P: PaxosPayload>(
let p_to_replicas = p_p2b_quorum_reached
.clone()
.anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once
- .map(q!(|(slot, value)| (slot, value)))
.all_ticks();
let p_p2b_all_commit_slots =
diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
index 4c2a5f103d18..169fe4a64112 100644
--- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
+++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
@@ -55,7 +55,7 @@ expression: built.ir()
},
},
Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . ballot }),
+ f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . max_ballot }),
input: CycleSource {
ident: Ident {
sym: cycle_0,
@@ -461,206 +461,152 @@ expression: built.ir()
2,
),
input: DeferTick(
- Union(
- Tee {
- inner: : Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
- input: CrossSingleton(
- Union(
+ Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+ input: CrossSingleton(
+ Union(
+ Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+ input: CrossSingleton(
Union(
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | max_slot + 1 }),
- input: Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
- input: CrossSingleton(
- Union(
- Tee {
- inner: : Reduce {
- f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
- input: Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
+ input: Tee {
+ inner: : Reduce {
+ f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
+ input: Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
+ input: Tee {
+ inner: : FoldKeyed {
+ init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }),
+ acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }),
+ input: FlatMap {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }),
input: Tee {
- inner: : FoldKeyed {
- init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }),
- acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }),
- input: FlatMap {
- f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }),
- input: Tee {
- inner: ,
- },
- },
- },
- },
- },
- },
- },
- Persist(
- Source {
- source: Iter(
- { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] },
- ),
- location_kind: Cluster(
- 2,
- ),
- },
- ),
- ),
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
- input: Filter {
- f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
- input: Fold {
- init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
- acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
- input: Tee {
- inner: : CycleSource {
- ident: Ident {
- sym: cycle_5,
- },
- location_kind: Cluster(
- 2,
- ),
+ inner: ,
},
},
},
},
},
- ),
- },
- },
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }),
- input: CrossSingleton(
- Tee {
- inner: : Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
- input: CrossSingleton(
- Tee {
- inner: : Fold {
- init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
- acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
- input: Tee {
- inner: : Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
- input: Network {
- from_location: Cluster(
- 0,
- ),
- from_key: None,
- to_location: Cluster(
- 2,
- ),
- to_key: None,
- serialize_pipeline: Some(
- Operator(
- Operator {
- path: "map",
- args: [
- "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& data) . unwrap () . into ()) }",
- ],
- },
- ),
- ),
- instantiate_fn: ,
- deserialize_pipeline: Some(
- Operator(
- Operator {
- path: "map",
- args: [
- "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& b) . unwrap ()) }",
- ],
- },
- ),
- ),
- input: CycleSource {
- ident: Ident {
- sym: cycle_0,
- },
- location_kind: Cluster(
- 0,
- ),
- },
- },
- },
- },
- },
- },
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
- input: Tee {
- inner: : Filter {
- f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | num_payloads | * num_payloads > 0 }),
- input: Tee {
- inner: ,
- },
- },
- },
- },
- ),
- },
},
- Tee {
- inner: ,
- },
- ),
+ },
},
+ Persist(
+ Source {
+ source: Iter(
+ { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; 0 } ; [e] },
+ ),
+ location_kind: Cluster(
+ 2,
+ ),
+ },
+ ),
),
Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
- input: CrossSingleton(
- Tee {
- inner: ,
- },
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
- input: Filter {
- f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
- input: Fold {
- init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
- acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
- input: Tee {
- inner: ,
+ f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+ input: Filter {
+ f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
+ input: Fold {
+ init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
+ acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+ input: Tee {
+ inner: : CycleSource {
+ ident: Ident {
+ sym: cycle_5,
},
+ location_kind: Cluster(
+ 2,
+ ),
},
},
},
- ),
+ },
},
),
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
- input: Tee {
- inner: ,
- },
- },
- ),
- },
- },
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < bool , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | 0 }),
- input: Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
- input: CrossSingleton(
- Tee {
- inner: ,
- },
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
- input: Filter {
- f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
- input: Fold {
+ },
+ Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }),
+ input: CrossSingleton(
+ Tee {
+ inner: : Fold {
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
- acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+ acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
- inner: ,
+ inner: : Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }),
+ input: CrossSingleton(
+ CrossSingleton(
+ Enumerate(
+ Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
+ input: Network {
+ from_location: Cluster(
+ 0,
+ ),
+ from_key: None,
+ to_location: Cluster(
+ 2,
+ ),
+ to_key: None,
+ serialize_pipeline: Some(
+ Operator(
+ Operator {
+ path: "map",
+ args: [
+ "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& data) . unwrap () . into ()) }",
+ ],
+ },
+ ),
+ ),
+ instantiate_fn: ,
+ deserialize_pipeline: Some(
+ Operator(
+ Operator {
+ path: "map",
+ args: [
+ "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& b) . unwrap ()) }",
+ ],
+ },
+ ),
+ ),
+ input: CycleSource {
+ ident: Ident {
+ sym: cycle_0,
+ },
+ location_kind: Cluster(
+ 0,
+ ),
+ },
+ },
+ },
+ ),
+ Tee {
+ inner: ,
+ },
+ ),
+ Tee {
+ inner: ,
+ },
+ ),
+ },
},
},
},
- },
- ),
+ Tee {
+ inner: ,
+ },
+ ),
+ },
+ ),
+ Map {
+ f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+ input: Tee {
+ inner: ,
+ },
},
- },
- ),
+ ),
+ },
),
},
CycleSink {
@@ -675,18 +621,18 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }),
input: Tee {
- inner: : FilterMap {
+ inner: : FilterMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }),
input: Tee {
- inner: : FoldKeyed {
+ inner: : FoldKeyed {
init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }),
input: FilterMap {
- f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . victory { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }),
+ f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }),
input: Tee {
- inner: : Union(
+ inner: : Union(
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
input: Network {
from_location: Cluster(
@@ -719,10 +665,10 @@ expression: built.ir()
),
),
input: Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { victory : p2a . ballot == max_ballot , ballot : p2a . ballot , slot : p2a . slot , value : p2a . value }) }),
+ f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }),
input: CrossSingleton(
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
input: Network {
from_location: Cluster(
@@ -771,7 +717,7 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }),
input: CrossSingleton(
Tee {
- inner: ,
+ inner: ,
},
Tee {
inner: ,
@@ -785,13 +731,13 @@ expression: built.ir()
FlatMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }),
input: Tee {
- inner: ,
+ inner: ,
},
},
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
input: Tee {
- inner: ,
+ inner: ,
},
},
),
@@ -826,23 +772,8 @@ expression: built.ir()
},
),
},
- Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }),
- input: CrossSingleton(
- CrossSingleton(
- Enumerate(
- Tee {
- inner: ,
- },
- ),
- Tee {
- inner: ,
- },
- ),
- Tee {
- inner: ,
- },
- ),
+ Tee {
+ inner: ,
},
),
Map {
@@ -882,10 +813,10 @@ expression: built.ir()
},
},
Tee {
- inner: : FilterMap {
+ inner: : FilterMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -906,11 +837,11 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }),
input: Tee {
- inner: ,
+ inner: ,
},
},
Tee {
- inner: ,
+ inner: ,
},
),
},
@@ -932,7 +863,7 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }),
input: CrossSingleton(
Tee {
- inner: ,
+ inner: ,
},
Tee {
inner: ,
@@ -951,7 +882,7 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
input: CrossSingleton(
Tee {
- inner: : ReduceKeyed {
+ inner: : ReduceKeyed {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }),
input: Persist(
Network {
@@ -1007,7 +938,7 @@ expression: built.ir()
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1041,7 +972,7 @@ expression: built.ir()
2,
),
input: Tee {
- inner: ,
+ inner: ,
},
},
CycleSink {
@@ -1058,10 +989,10 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }),
input: CrossSingleton(
Tee {
- inner: : Sort(
+ inner: : Sort(
Union(
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
input: Network {
from_location: Cluster(
@@ -1097,22 +1028,19 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (slot , data) | ReplicaPayload { seq : slot , key : data . key , value : data . value } }),
- input: Map {
- f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , value) | (slot , value) }),
- input: AntiJoin(
- Tee {
- inner: ,
- },
- CycleSource {
- ident: Ident {
- sym: cycle_6,
- },
- location_kind: Cluster(
- 2,
- ),
+ input: AntiJoin(
+ Tee {
+ inner: ,
+ },
+ CycleSource {
+ ident: Ident {
+ sym: cycle_6,
},
- ),
- },
+ location_kind: Cluster(
+ 2,
+ ),
+ },
+ ),
},
},
},
@@ -1130,12 +1058,12 @@ expression: built.ir()
),
},
Tee {
- inner: : Fold {
+ inner: : Fold {
init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }),
input: CrossSingleton(
Tee {
- inner: ,
+ inner: ,
},
Union(
CycleSource {
@@ -1172,23 +1100,23 @@ expression: built.ir()
),
input: DeferTick(
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }),
input: Fold {
init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }),
input: Persist(
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }),
input: Filter {
f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }),
input: CrossSingleton(
Tee {
- inner: ,
+ inner: ,
},
Tee {
- inner: ,
+ inner: ,
},
),
},
@@ -1209,7 +1137,7 @@ expression: built.ir()
),
input: DeferTick(
Tee {
- inner: : FilterMap {
+ inner: : FilterMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }),
input: CrossSingleton(
Union(
@@ -1238,7 +1166,7 @@ expression: built.ir()
),
),
Tee {
- inner: ,
+ inner: ,
},
),
},
@@ -1253,7 +1181,7 @@ expression: built.ir()
1,
),
input: Tee {
- inner: ,
+ inner: ,
},
},
CycleSink {
@@ -1266,7 +1194,7 @@ expression: built.ir()
input: DeferTick(
AntiJoin(
Tee {
- inner: : Union(
+ inner: : Union(
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }),
input: Network {
@@ -1302,7 +1230,7 @@ expression: built.ir()
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1318,13 +1246,13 @@ expression: built.ir()
),
},
Tee {
- inner: : FilterMap {
+ inner: : FilterMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }),
input: FoldKeyed {
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1345,7 +1273,7 @@ expression: built.ir()
input: Union(
Union(
Tee {
- inner: : CycleSource {
+ inner: : CycleSource {
ident: Ident {
sym: cycle_2,
},
@@ -1359,9 +1287,9 @@ expression: built.ir()
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }),
input: Tee {
- inner: : Delta(
+ inner: : Delta(
Tee {
- inner: : Reduce {
+ inner: : Reduce {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
input: Persist(
Inspect {
@@ -1417,7 +1345,7 @@ expression: built.ir()
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1442,10 +1370,10 @@ expression: built.ir()
},
),
Tee {
- inner: : Map {
+ inner: : Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1458,7 +1386,7 @@ expression: built.ir()
input: CrossSingleton(
CrossSingleton(
Tee {
- inner: : Source {
+ inner: : Source {
source: Interval(
{ use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) },
),
@@ -1476,10 +1404,10 @@ expression: built.ir()
f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }),
input: Join(
Tee {
- inner: ,
+ inner: ,
},
Tee {
- inner: ,
+ inner: ,
},
),
},
@@ -1487,7 +1415,7 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }),
input: Tee {
- inner: ,
+ inner: ,
},
},
),
@@ -1509,7 +1437,7 @@ expression: built.ir()
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
- inner: ,
+ inner: ,
},
},
Map {
@@ -1520,7 +1448,7 @@ expression: built.ir()
init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
input: Tee {
- inner: ,
+ inner: ,
},
},
},
@@ -1532,7 +1460,7 @@ expression: built.ir()
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }),
input: Tee {
- inner: ,
+ inner: ,
},
},
),
@@ -1552,17 +1480,17 @@ expression: built.ir()
FlatMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot . leader_id () , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }),
input: Tee {
- inner: ,
+ inner: ,
},
},
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot . leader_id () , ClientPayload { key , value : c_id . raw_id . to_string () }) }),
input: CrossSingleton(
Tee {
- inner: ,
+ inner: ,
},
Tee {
- inner: ,
+ inner: ,
},
),
},