From dff2a40669736014349cf12744d6a057a7992e11 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Wed, 30 Oct 2024 14:07:46 -0700 Subject: [PATCH] refactor(paxos): start splitting out leader election into a separate module (#1485) --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/hydro-project/hydroflow/pull/1485). * #1493 * #1492 * #1489 * #1488 * #1487 * #1486 * __->__ #1485 --- hydroflow_plus/src/singleton.rs | 4 + hydroflow_plus_test/src/cluster/paxos.rs | 240 ++-- ...cluster__paxos_bench__tests__paxos_ir.snap | 1035 +++++++++-------- 3 files changed, 633 insertions(+), 646 deletions(-) diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index 41140422696..54374e6c3f1 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -580,6 +580,10 @@ impl<'a, T, N: Location<'a>> Optional { .map(q!(|(d, _signal)| d)) } + pub fn then(self, value: Singleton) -> Optional { + value.continue_if(self) + } + pub fn continue_unless( self, other: Optional, diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 488be19c01e..0667dae4e9d 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -21,12 +21,12 @@ pub trait PaxosPayload: pub struct Ballot { // Note: Important that num comes before id, since Ord is defined lexicographically pub num: u32, - pub id: ClusterId, + pub proposer_id: ClusterId, } impl LeaderElected for Ballot { fn leader_id(&self) -> ClusterId { - self.id + self.proposer_id } } @@ -57,8 +57,8 @@ struct P2a

{ #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] struct P2b

{ + victory: bool, ballot: Ballot, - max_ballot: Ballot, slot: i32, value: P, } @@ -119,7 +119,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b( &proposers, - a_to_proposers_p1b.inspect(q!(|(_, p1b)| println!("Proposer received P1b: {:?}", p1b))), + a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))), p_ballot_num.clone(), p_has_largest_ballot, f, @@ -152,7 +152,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let p_to_clients_new_leader_elected = p_is_leader.clone() .continue_unless(p_next_slot) .cross_singleton(p_ballot_num) - .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, id: p_id})) // Only tell the clients once when leader election concludes + .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes .all_ticks(); // End tell clients that leader election has completed let p_to_replicas = p_p2b(&proposers, a_to_proposers_p2b, f); @@ -197,8 +197,8 @@ fn acceptor<'a, P: PaxosPayload, R>( acceptors: &Cluster<'a, Acceptor>, f: usize, ) -> ( - Stream<(ClusterId, P1b

), Unbounded, NoTick, Cluster<'a, Proposer>>, - Stream<(ClusterId, P2b

), Unbounded, NoTick, Cluster<'a, Proposer>>, + Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, ) { // Get the latest checkpoint sequence per replica let a_checkpoint_largest_seqs = @@ -229,7 +229,7 @@ fn acceptor<'a, P: PaxosPayload, R>( // Create tuple with checkpoint number and dummy p2a ballot: Ballot { num: 0, - id: ClusterId::from_raw(0) + proposer_id: ClusterId::from_raw(0) }, slot: -1, value: Default::default() @@ -243,7 +243,7 @@ fn acceptor<'a, P: PaxosPayload, R>( .max() .unwrap_or(acceptors.singleton(q!(Ballot { num: 0, - id: ClusterId::from_raw(0) + proposer_id: ClusterId::from_raw(0) }))); let a_p2as_to_place_in_log = p_to_acceptors_p2a .clone() @@ -303,7 +303,7 @@ fn acceptor<'a, P: PaxosPayload, R>( .cross_singleton(a_max_ballot.clone().latest_tick()) .cross_singleton(a_log) .map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| ( - p1a.ballot.id, + p1a.ballot.proposer_id, P1b { ballot: p1a.ballot, max_ballot, @@ -311,33 +311,27 @@ fn acceptor<'a, P: PaxosPayload, R>( } ))) .all_ticks() - .send_bincode(proposers); + .send_bincode_interleaved(proposers); let a_to_proposers_p2b_new = p_to_acceptors_p2a .tick_batch() .cross_singleton(a_max_ballot.latest_tick()) .map(q!(|(p2a, max_ballot)| ( - p2a.ballot.id, + p2a.ballot.proposer_id, P2b { + victory: p2a.ballot == max_ballot, ballot: p2a.ballot, - max_ballot, slot: p2a.slot, value: p2a.value } ))) .all_ticks() - .send_bincode(proposers); + .send_bincode_interleaved(proposers); (a_to_proposers_p1b_new, a_to_proposers_p2b_new) } -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_p2b<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - a_to_proposers_p2b: Stream< - (ClusterId, P2b

), - Unbounded, - NoTick, - Cluster<'a, Proposer>, - >, + a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, f: usize, ) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> { let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); @@ -348,62 +342,55 @@ fn p_p2b<'a, P: PaxosPayload>( .union(p_persisted_p2bs); let p_count_matching_p2bs = p_p2b .clone() - .filter_map(q!(|(sender, p2b)| if p2b.ballot == p2b.max_ballot { + .filter_map(q!(|p2b| if p2b.victory { // Only consider p2bs where max ballot = ballot, which means that no one preempted us - Some((p2b.slot, (sender, p2b))) + Some(((p2b.slot, p2b.ballot), p2b.value)) } else { None })) .fold_keyed( - q!(|| ( - 0, - P2b { - ballot: Ballot { - num: 0, - id: ClusterId::from_raw(0) - }, - max_ballot: Ballot { - num: 0, - id: ClusterId::from_raw(0) - }, - slot: 0, - value: Default::default() - } - )), - q!(|accum, (_sender, p2b)| { + q!(|| (0, Default::default())), + q!(|accum, value| { + // TODO(shadaj): why is sender unused? should we de-dup? accum.0 += 1; - accum.1 = p2b; + accum.1 = value; }), ); - let p_p2b_quorum_reached = p_count_matching_p2bs - .clone() - .filter(q!(move |(_slot, (count, _p2b))| *count > f)); + let p_p2b_quorum_reached = + p_count_matching_p2bs + .clone() + .filter_map(q!(move |((slot, _ballot), (count, value))| if count > f { + Some((slot, value)) + } else { + None + })); let p_to_replicas = p_p2b_quorum_reached .clone() .anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once - .map(q!(|(_slot, (_count, p2b))| (p2b.slot, p2b.value))) + .map(q!(|(slot, value)| (slot, value))) .all_ticks(); let p_p2b_all_commit_slots = - p_count_matching_p2bs - .clone() - .filter_map(q!(move |(slot, (count, _p2b))| if count == 2 * f + 1 { + p_count_matching_p2bs.clone().filter_map(q!( + move |((slot, _ballot), (count, _p2b))| if count == 2 * f + 1 { Some(slot) } else { None - })); + } + )); // p_p2b_all_commit_slots.inspect(q!(|slot: i32| println!("Proposer slot all received: {:?}", slot))); let p_broadcasted_p2b_slots_new = p_p2b_quorum_reached .clone() - .map(q!(|(slot, (_count, _p2b))| slot)) + .map(q!(|(slot, _value)| slot)) .filter_not_in(p_p2b_all_commit_slots.clone()); // p_broadcasted_p2b_slots_new.inspect(q!(|slot: i32| println!("Proposer slot broadcasted: {:?}", slot))); p_broadcasted_p2b_slots_complete_cycle.complete_next_tick(p_broadcasted_p2b_slots_new); let p_persisted_p2bs_new = p_p2b .clone() - .map(q!(|(sender, p2b)| (p2b.slot, (sender, p2b)))) + .map(q!(|p2b| (p2b.slot, p2b))) .anti_join(p_p2b_all_commit_slots.clone()) - .map(q!(|(_slot, (sender, p2b))| (sender, p2b))); + .map(q!(|(_slot, p2b)| p2b)); + // TOOD: only persist if we are the leader // p_persisted_p2bs_new.inspect(q!(|(sender, p2b): (u32, P2b)| println!("Proposer persisting p2b: {:?}", p2b))); p_persisted_p2bs_complete_cycle.complete_next_tick(p_persisted_p2bs_new); p_to_replicas @@ -446,7 +433,7 @@ fn p_p2a<'a, P: PaxosPayload>( // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) .cross_singleton(p_ballot_num.clone()) // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) - .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, id: p_id }, slot: next_slot + index as i32, value: payload })); + .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload })); // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); let p_to_acceptors_p2a = p_log_to_try_commit .union(p_log_holes) @@ -489,12 +476,7 @@ fn p_p2a<'a, P: PaxosPayload>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_p1b<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - a_to_proposers_p1b: Stream< - (ClusterId, P1b

), - Unbounded, - NoTick, - Cluster<'a, Proposer>, - >, + a_to_proposers_p1b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, p_ballot_num: Singleton>, p_has_largest_ballot: Optional<(Ballot, u32), Bounded, Tick, Cluster<'a, Proposer>>, f: usize, @@ -506,30 +488,26 @@ fn p_p1b<'a, P: PaxosPayload>( ) { let p_id = proposers.self_id(); let p_relevant_p1bs = a_to_proposers_p1b - .clone() .tick_prefix() + // NOTE: because `p_ballot_num` grows monotonically across ticks, we could garbage gollect + // but we don't do that here since leader election is a rare event .cross_singleton(p_ballot_num.clone()) - .filter(q!(move |((_sender, p1b), ballot_num)| p1b.ballot - == Ballot { - num: *ballot_num, - id: p_id + .filter(q!(move |(p1b, ballot_num)| p1b.ballot.num == *ballot_num && p1b.ballot.proposer_id == p_id)) + .map(q!(|t| t.0)); + let p_received_quorum_of_p1bs = + p_relevant_p1bs + .clone() + .count() + .filter_map(q!(move |num_received| if num_received > f { + Some(true) + } else { + None })); - let p_received_quorum_of_p1bs = p_relevant_p1bs - .clone() - .map(q!(|((sender, _p1b), _ballot_num)| { sender })) - .unique() - .count() - .filter_map(q!(move |num_received| if num_received > f { - Some(true) - } else { - None - })); let p_is_leader = p_received_quorum_of_p1bs.continue_if(p_has_largest_ballot.clone()); let p_p1b_highest_entries_and_count = p_relevant_p1bs - .clone() - .flat_map(q!(|((_, p1b), _)| p1b.accepted.into_iter())) // Convert HashMap log back to stream - .fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| { + .flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream + .fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| { let same_values = new_entry.value == curr_entry.1.value; let higher_ballot = new_entry.ballot > curr_entry.1.ballot; // Increment count if the values are the same @@ -554,7 +532,7 @@ fn p_p1b<'a, P: PaxosPayload>( Some(P2a { ballot: Ballot { num: ballot_num, - id: p_id, + proposer_id: p_id, }, slot, value: entry.value, @@ -578,7 +556,7 @@ fn p_p1b<'a, P: PaxosPayload>( .map(q!(move |(slot, ballot_num)| P2a { ballot: Ballot { num: ballot_num, - id: p_id + proposer_id: p_id }, slot, value: Default::default() @@ -587,36 +565,21 @@ fn p_p1b<'a, P: PaxosPayload>( } // Proposer logic to calculate the largest ballot received so far. -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn p_max_ballot<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - a_to_proposers_p1b: Stream< - (ClusterId, P1b

), - Unbounded, - NoTick, - Cluster<'a, Proposer>, - >, - a_to_proposers_p2b: Stream< - (ClusterId, P2b

), - Unbounded, - NoTick, - Cluster<'a, Proposer>, - >, + a_to_proposers_p1b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, p_to_proposers_i_am_leader: Stream>, ) -> Singleton> { - let p_received_p1b_ballots = a_to_proposers_p1b - .clone() - .map(q!(|(_, p1b)| p1b.max_ballot)); - let p_received_p2b_ballots = a_to_proposers_p2b - .clone() - .map(q!(|(_, p2b)| p2b.max_ballot)); + let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot)); + let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot)); p_received_p1b_ballots .union(p_received_p2b_ballots) .union(p_to_proposers_i_am_leader) .max() .unwrap_or(proposers.singleton(q!(Ballot { num: 0, - id: ClusterId::from_raw(0) + proposer_id: ClusterId::from_raw(0) }))) } @@ -640,7 +603,7 @@ fn p_ballot_calc<'a>( if received_max_ballot > (Ballot { num: ballot_num, - id: p_id, + proposer_id: p_id, }) { received_max_ballot.num + 1 @@ -657,7 +620,7 @@ fn p_ballot_calc<'a>( move |(received_max_ballot, ballot_num)| *received_max_ballot <= Ballot { num: *ballot_num, - id: p_id + proposer_id: p_id } )); @@ -685,21 +648,55 @@ fn p_p1a<'a>( Stream>, ) { let p_id = proposers.self_id(); - let p_to_proposers_i_am_leader_new = p_ballot_num + let p_to_proposers_i_am_leader_new = p_is_leader .clone() + .then(p_ballot_num.clone()) + .latest() + .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) + .map(q!(move |ballot_num| Ballot { + num: ballot_num, + proposer_id: p_id + })) + .broadcast_bincode_interleaved(proposers); + + let p_leader_expired = p_leader_expired( + p_to_proposers_i_am_leader, + p_is_leader, + i_am_leader_check_timeout, + ); + + let p_id = proposers.self_id(); + + // Add random delay depending on node ID so not everyone sends p1a at the same time + let p_to_acceptors_p1a = p_leader_expired + .then(p_ballot_num) .continue_if( proposers - .source_interval(q!(Duration::from_secs(i_am_leader_send_timeout))) + .source_interval_delayed( + q!(Duration::from_secs( + (p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into() + )), + q!(Duration::from_secs(i_am_leader_check_timeout)), + ) .latest_tick(), ) - .continue_if(p_is_leader.clone()) - .map(q!(move |ballot_num| Ballot { - num: ballot_num, - id: p_id + .map(q!(move |ballot_num| P1a { + ballot: Ballot { + num: ballot_num, + proposer_id: p_id + } })) .all_ticks() - .broadcast_bincode_interleaved(proposers); + .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) + .broadcast_bincode_interleaved(acceptors); + (p_to_proposers_i_am_leader_new, p_to_acceptors_p1a) +} +fn p_leader_expired<'a>( + p_to_proposers_i_am_leader: Stream>, + p_is_leader: Optional>, + i_am_leader_check_timeout: u64, // How often to check if heartbeat expired +) -> Optional, Bounded, Tick, Cluster<'a, Proposer>> { let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( q!(|| None), q!(|latest, _| { @@ -707,34 +704,17 @@ fn p_p1a<'a>( *latest = Some(Instant::now()); }), ); - // Add random delay depending on node ID so not everyone sends p1a at the same time - let p_leader_expired = proposers.source_interval_delayed(q!(Duration::from_secs((p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into())), q!(Duration::from_secs(i_am_leader_check_timeout))) - .cross_singleton(p_latest_received_i_am_leader.clone()) + + p_latest_received_i_am_leader + .clone() .latest_tick() - // .inspect(q!(|v| println!("Proposer checking if leader expired"))) - // .continue_if(p_is_leader.clone().count().filter(q!(|c| *c == 0)).inspect(q!(|c| println!("Proposer is_leader count: {}", c)))) .continue_unless(p_is_leader) - .filter(q!(move |(_, latest_received_i_am_leader)| { + .filter(q!(move |latest_received_i_am_leader| { if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { - (Instant::now().duration_since(*latest_received_i_am_leader)) > Duration::from_secs(i_am_leader_check_timeout) + (Instant::now().duration_since(*latest_received_i_am_leader)) + > Duration::from_secs(i_am_leader_check_timeout) } else { true } - })); - p_leader_expired - .clone() - .all_ticks() - .for_each(q!(|_| println!("Proposer leader expired"))); - - let p_to_acceptors_p1a = p_ballot_num - .continue_if(p_leader_expired) - .map(q!(move |ballot_num| P1a { - ballot: Ballot { - num: ballot_num, - id: p_id - } })) - .all_ticks() - .broadcast_bincode_interleaved(acceptors); - (p_to_proposers_i_am_leader_new, p_to_acceptors_p1a) } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index f20193cd83b..af57d72b873 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -22,7 +22,7 @@ expression: built.ir() 2, ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , proposer_id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }), input: CrossSingleton( Tee { inner: : Union( @@ -32,7 +32,7 @@ expression: built.ir() Union( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p1b) | p1b . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . max_ballot }), input: Tee { inner: : CycleSource { ident: Ident { @@ -45,7 +45,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p2b) | p2b . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . ballot }), input: Tee { inner: : CycleSource { ident: Ident { @@ -74,7 +74,7 @@ expression: built.ir() Persist( Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , id : ClusterId :: from_raw (0) } } ; [e] }, + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, ), location_kind: Cluster( 2, @@ -108,105 +108,6 @@ expression: built.ir() ), }, }, - ForEach { - f: stageleft :: runtime_support :: fn1_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired") }), - input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , ()) , (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - CrossSingleton( - Source { - source: Stream( - { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, - ), - location_kind: Cluster( - 2, - ), - }, - Tee { - inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), - input: Persist( - Tee { - inner: , - }, - ), - }, - }, - ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Unique( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((sender , _p1b) , _ballot_num) | { sender } }), - input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((_sender , p1b) , ballot_num) | p1b . ballot == Ballot { num : * ballot_num , id : p_id } }), - input: CrossSingleton( - Persist( - Tee { - inner: : Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p1b) | println ! ("Proposer received P1b: {:?}" , p1b) }), - input: Tee { - inner: , - }, - }, - }, - ), - Tee { - inner: , - }, - ), - }, - }, - }, - ), - }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , id : p_id } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - }, - }, - ), - }, - }, - }, - }, - }, - ), - }, - }, - }, - }, CycleSink { ident: Ident { sym: cycle_0, @@ -249,7 +150,7 @@ expression: built.ir() input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , id : p_id } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( @@ -260,22 +161,70 @@ expression: built.ir() inner: , }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Source { - source: Interval( - { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }, - ), - location_kind: Cluster( - 2, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + input: CrossSingleton( + Persist( + Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + input: Tee { + inner: , + }, + }, + ), + Tee { + inner: , + }, + ), + }, + }, + }, + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: : Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + }, + }, + ), + }, }, }, ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Tee { - inner: , + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Source { + source: Interval( + stageleft :: runtime_support :: type_hint :: < core :: time :: Duration > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }), + ), + location_kind: Cluster( + 2, + ), }, }, ), @@ -295,7 +244,7 @@ expression: built.ir() input: DeferTick( Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( @@ -307,18 +256,18 @@ expression: built.ir() input: CrossSingleton( Union( Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((_ , p1b) , _) | p1b . accepted . into_iter () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { - inner: , + inner: , }, }, }, @@ -345,7 +294,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_4, }, @@ -364,15 +313,15 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -420,10 +369,10 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: : Filter { + inner: : Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | num_payloads | * num_payloads > 0 }), input: Tee { - inner: , + inner: , }, }, }, @@ -432,7 +381,7 @@ expression: built.ir() }, }, Tee { - inner: , + inner: , }, ), }, @@ -441,7 +390,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -451,7 +400,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -462,7 +411,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), @@ -474,7 +423,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -484,7 +433,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -505,18 +454,18 @@ expression: built.ir() input: DeferTick( Difference( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , (_count , _p2b)) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | (_slot , (count , _p2b)) | * count > f }), + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { - inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , P2b { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , max_ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , slot : 0 , value : Default :: default () }) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , (_sender , p2b) | { accum . 0 += 1 ; accum . 1 = p2b ; } }), + inner: : FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , core :: option :: Option < (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (sender , p2b) | if p2b . ballot == p2b . max_ballot { Some ((p2b . slot , (sender , p2b))) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . victory { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { - inner: : Union( + inner: : Union( Tee { inner: , }, @@ -537,10 +486,10 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | (slot , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -556,16 +505,16 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , (sender , p2b)) | (sender , p2b) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), input: AntiJoin( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (sender , p2b) | (p2b . slot , (sender , p2b)) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { - inner: , + inner: , }, }, Tee { - inner: , + inner: , }, ), }, @@ -589,355 +538,406 @@ expression: built.ir() location_kind: Cluster( 2, ), - input: Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 2, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", - ], - }, + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 3, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 2, ), - ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), - input: CrossSingleton( - CrossSingleton( - Tee { - inner: : Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 2, - ), - from_key: None, - to_location: Cluster( - 3, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }", - ], - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), + input: CrossSingleton( + CrossSingleton( + Tee { + inner: : Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 2, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 3, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Tee { - inner: , - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), + input: Persist( + Tee { + inner: , + }, + ), + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , + }, + }, + }, + }, + ), + }, + }, + }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Source { + source: Stream( + { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, + ), + location_kind: Cluster( + 2, + ), + }, + }, + ), }, - ), + }, }, }, }, }, }, }, - }, - Tee { - inner: : Union( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Persist( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }), - input: Tee { - inner: , + Tee { + inner: : Union( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Persist( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }), + input: Tee { + inner: , + }, }, - }, - ), - }, - Persist( - Source { - source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , id : ClusterId :: from_raw (0) } } ; [e] }, - ), - location_kind: Cluster( - 3, ), }, + Persist( + Source { + source: Iter( + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, + ), + location_kind: Cluster( + 3, + ), + }, + ), ), - ), - }, - ), - Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , (new_checkpoint , p2a) | { if new_checkpoint != - 1 { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } else { if p2a . slot > * prev_checkpoint { match log . get (& p2a . slot) { None => { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } Some (prev_p2a) => { if p2a . ballot > prev_p2a . ballot { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } ; } } } }), - input: Persist( - Union( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some ((- 1 , p2a)) } else { None } }), - input: CrossSingleton( - Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 2, - ), - from_key: None, - to_location: Cluster( - 3, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", - ], - }, + }, + ), + Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , (new_checkpoint , p2a) | { if new_checkpoint != - 1 { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } else { if p2a . slot > * prev_checkpoint { match log . get (& p2a . slot) { None => { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } Some (prev_p2a) => { if p2a . ballot > prev_p2a . ballot { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } ; } } } }), + input: Persist( + Union( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some ((- 1 , p2a)) } else { None } }), + input: CrossSingleton( + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 2, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 3, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Union( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Union( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , id : p_id , } , slot , value : entry . value , }) } else { None } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , id : p_id } , slot , value : Default :: default () } }), - input: CrossSingleton( - Difference( - FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), - input: Tee { - inner: , - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Union( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Union( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), + input: CrossSingleton( + Tee { + inner: , }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), - input: Tee { - inner: , + Tee { + inner: , + }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), + input: CrossSingleton( + Difference( + FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), + input: Tee { + inner: , + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + input: Tee { + inner: , + }, }, + ), + Tee { + inner: , }, ), - Tee { - inner: , + }, + ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , + }, }, - ), + }, }, ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }), + input: CrossSingleton( + CrossSingleton( + Enumerate( + Tee { inner: , }, - }, - }, - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , id : p_id } , slot : next_slot + index as i32 , value : payload } }), - input: CrossSingleton( - CrossSingleton( - Enumerate( + ), Tee { - inner: , + inner: , }, ), Tee { - inner: , + inner: , }, ), - Tee { - inner: , - }, - ), + }, + ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: Tee { + inner: , + }, }, ), - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: Tee { - inner: , - }, - }, - ), + }, }, }, }, }, - }, - Tee { - inner: , - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | (min_seq , P2a { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , slot : - 1 , value : Default :: default () }) }), - input: Delta( - Union( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), + Tee { + inner: , + }, + ), + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | (min_seq , P2a { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , slot : - 1 , value : Default :: default () }) }), + input: Delta( + Union( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: : ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), - input: Persist( - Network { - from_location: Cluster( - 1, - ), - from_key: None, - to_location: Cluster( - 3, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }", - ], - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : ReduceKeyed { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), + input: Persist( + Network { + from_location: Cluster( + 1, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 3, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: CycleSource { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 1, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }", + ], + }, ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, + }, + location_kind: Cluster( + 1, + ), + }, }, }, - }, - ), + ), + }, }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , + }, }, }, }, - }, - ), + ), + }, }, }, - }, - Persist( - Source { - source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] }, - ), - location_kind: Cluster( - 3, - ), - }, + Persist( + Source { + source: Iter( + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] }, + ), + location_kind: Cluster( + 3, + ), + }, + ), ), ), - ), - }, + }, + ), ), - ), - }, - ), + }, + ), + }, }, }, }, @@ -948,46 +948,49 @@ expression: built.ir() location_kind: Cluster( 2, ), - input: Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 2, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", - ], - }, + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 3, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 2, ), - ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + ], + }, + ), ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { victory : p2a . ballot == max_ballot , ballot : p2a . ballot , slot : p2a . slot , value : p2a . value }) }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, }, }, }, @@ -1005,10 +1008,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -1045,10 +1048,10 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (slot , data) | ReplicaPayload { seq : slot , key : data . key , value : data . value } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , (_count , p2b)) | (p2b . slot , p2b . value) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , value) | (slot , value) }), input: AntiJoin( Tee { - inner: , + inner: , }, CycleSource { ident: Ident { @@ -1077,12 +1080,12 @@ expression: built.ir() ), }, Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Union( CycleSource { @@ -1119,23 +1122,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1156,7 +1159,7 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Union( @@ -1185,7 +1188,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1200,7 +1203,7 @@ expression: built.ir() 1, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1213,7 +1216,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Union( + inner: : Union( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { @@ -1249,7 +1252,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1265,13 +1268,13 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1292,7 +1295,7 @@ expression: built.ir() input: Union( Union( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_2, }, @@ -1306,9 +1309,9 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: : Delta( + inner: : Delta( Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( Inspect { @@ -1348,13 +1351,13 @@ expression: built.ir() input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , u32) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (_is_leader , ballot_num) | Ballot { num : ballot_num , id : p_id } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , u32) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (_is_leader , ballot_num) | Ballot { num : ballot_num , proposer_id : p_id } }), input: CrossSingleton( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -1364,7 +1367,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1389,10 +1392,10 @@ expression: built.ir() }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1405,7 +1408,7 @@ expression: built.ir() input: CrossSingleton( CrossSingleton( Tee { - inner: : Source { + inner: : Source { source: Interval( { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, ), @@ -1423,10 +1426,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1434,7 +1437,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: , + inner: , }, }, ), @@ -1456,7 +1459,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1467,7 +1470,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1479,7 +1482,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1499,17 +1502,17 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot . leader_id () , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), input: Tee { - inner: , + inner: , }, }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot . leader_id () , ClientPayload { key , value : c_id . raw_id . to_string () }) }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), },