From 5331c891c61a7acaa2c4bbd368bffa82f0b0ea93 Mon Sep 17 00:00:00 2001 From: Shadaj Laddad Date: Tue, 8 Oct 2024 13:26:20 -0700 Subject: [PATCH] refactor(paxos): rearrange code to reflect order of phases --- hydroflow_plus_test/src/cluster/paxos.rs | 925 +++++++++--------- ...cluster__paxos_bench__tests__paxos_ir.snap | 688 ++++++------- 2 files changed, 814 insertions(+), 799 deletions(-) diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 7ebdc8160afb..94a8fb5cfa74 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -97,7 +97,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( .source_iter(q!(["Proposers say hello"])) .for_each(q!(|s| println!("{}", s))); - let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b) = + let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) = proposers.forward_ref::>(); let (a_log_complete_cycle, a_log_forward_reference) = acceptors.tick_forward_ref::>(); @@ -110,11 +110,11 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( i_am_leader_send_timeout, i_am_leader_check_timeout, i_am_leader_check_timeout_delay_multiplier, - a_to_proposers_p2b, + a_to_proposers_p2b_forward_reference, a_log_forward_reference, ); - let (p_to_clients_new_leader_elected, p_to_replicas, a_log, a_to_proposers_p2b_new) = + let (p_to_clients_new_leader_elected, p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload( &proposers, &acceptors, @@ -130,7 +130,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( ); a_log_complete_cycle.complete(a_log.clone()); - a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b_new); + a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b); ( proposers, @@ -140,85 +140,6 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( ) } -#[expect( - clippy::type_complexity, - clippy::too_many_arguments, - reason = "internal paxos code // TODO" -)] -fn sequence_payload<'a, P: PaxosPayload, R>( - proposers: &Cluster<'a, Proposer>, - acceptors: &Cluster<'a, Acceptor>, - c_to_proposers: Stream>, - r_to_acceptors_checkpoint: impl FnOnce( - &Cluster<'a, Acceptor>, - ) -> Stream< - (ClusterId, i32), - Unbounded, - NoTick, - Cluster<'a, Acceptor>, - >, - - p_ballot_num: Singleton>, - p_is_leader: Optional>, - p_max_slot: Optional>, - - p_log_to_try_commit: Stream, Bounded, Tick, Cluster<'a, Proposer>>, - p_log_holes: Stream, Bounded, Tick, Cluster<'a, Proposer>>, - f: usize, - - a_max_ballot: Singleton>, -) -> ( - Stream>, - Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, - Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, - Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, -) { - let p_id = proposers.self_id(); - let (p_next_slot, p_to_acceptors_p2a) = p_p2a( - proposers, - p_max_slot, - c_to_proposers, - p_ballot_num.clone(), - p_log_to_try_commit, - p_log_holes, - p_is_leader.clone(), - acceptors, - ); - - // Tell clients that leader election has completed and they can begin sending messages - let p_to_clients_new_leader_elected = p_is_leader.clone() - .continue_unless(p_next_slot) - .cross_singleton(p_ballot_num) - .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes - .all_ticks(); - - // Acceptors. - acceptors - .source_iter(q!(["Acceptors say hello"])) - .for_each(q!(|s| println!("{}", s))); - let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors); - - // p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a))); - let (a_log, a_to_proposers_p2b) = acceptor_p2( - a_max_ballot.clone(), - p_to_acceptors_p2a, - r_to_acceptors_checkpoint, - proposers, - acceptors, - f, - ); - - // End tell clients that leader election has completed - let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f); - - ( - p_to_clients_new_leader_elected, - p_to_replicas, - a_log, - a_to_proposers_p2b, - ) -} - #[expect( clippy::type_complexity, clippy::too_many_arguments, @@ -245,32 +166,25 @@ fn leader_election<'a, P: PaxosPayload>( proposers.forward_ref::>(); let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) = proposers.forward_ref::>(); + let (p_is_leader_complete_cycle, p_is_leader) = + proposers.tick_forward_ref::>(); // a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b))); // p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot))); // c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload))); let p_received_max_ballot = p_max_ballot( proposers, - a_to_proposers_p1b.clone(), + a_to_proposers_p1b, a_to_proposers_p2b, p_to_proposers_i_am_leader.clone(), ); let (p_ballot_num, p_has_largest_ballot) = p_ballot_calc(proposers, p_received_max_ballot.latest_tick()); - let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b( - proposers, - a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))), - p_ballot_num.clone(), - p_has_largest_ballot, - f, - ); - let (p_to_proposers_i_am_leader_from_others, p_to_acceptors_p1a) = p_p1a( p_ballot_num.clone(), p_is_leader.clone(), proposers, - p_to_proposers_i_am_leader, acceptors, i_am_leader_send_timeout, i_am_leader_check_timeout, @@ -288,12 +202,17 @@ fn leader_election<'a, P: PaxosPayload>( proposer_id: ClusterId::from_raw(0) }))); - a_to_proposers_p1b_complete_cycle.complete(acceptor_p1( - p_to_acceptors_p1a, - &a_max_ballot, - a_log, + let a_to_proposers_p1b = acceptor_p1(p_to_acceptors_p1a, &a_max_ballot, a_log, proposers); + a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone()); + + let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b( proposers, - )); + a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))), + p_ballot_num.clone(), + p_has_largest_ballot, + f, + ); + p_is_leader_complete_cycle.complete(p_is_leader.clone()); ( p_ballot_num, @@ -305,282 +224,177 @@ fn leader_election<'a, P: PaxosPayload>( ) } -#[derive(Clone)] -enum CheckpointOrP2a

{ - Checkpoint(i32), - P2a(P2a

), +// Proposer logic to calculate the largest ballot received so far. +fn p_max_ballot<'a, P: PaxosPayload>( + proposers: &Cluster<'a, Proposer>, + a_to_proposers_p1b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + p_to_proposers_i_am_leader: Stream>, +) -> Singleton> { + let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot)); + let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot)); + p_received_p1b_ballots + .union(p_received_p2b_ballots) + .union(p_to_proposers_i_am_leader) + .max() + .unwrap_or(proposers.singleton(q!(Ballot { + num: 0, + proposer_id: ClusterId::from_raw(0) + }))) } +// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot. #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -fn acceptor_p2<'a, P: PaxosPayload, R>( - a_max_ballot: Singleton>, - p_to_acceptors_p2a: Stream, Unbounded, NoTick, Cluster<'a, Acceptor>>, - r_to_acceptors_checkpoint: Stream< - (ClusterId, i32), - Unbounded, - NoTick, - Cluster<'a, Acceptor>, - >, +fn p_ballot_calc<'a>( proposers: &Cluster<'a, Proposer>, - acceptors: &Cluster<'a, Acceptor>, - f: usize, + p_received_max_ballot: Singleton>, ) -> ( - Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, - Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + Singleton>, + Optional<(Ballot, u32), Bounded, Tick, Cluster<'a, Proposer>>, ) { - // Get the latest checkpoint sequence per replica - let a_checkpoint_largest_seqs = - r_to_acceptors_checkpoint - .tick_prefix() - .reduce_keyed(q!(|curr_seq, seq| { - if seq > *curr_seq { - *curr_seq = seq; - } - })); - let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs.clone().count().filter_map(q!( - move |num_received| if num_received == f + 1 { - Some(true) - } else { - None - } - )); - // Find the smallest checkpoint seq that everyone agrees to, track whenever it changes - let a_new_checkpoint = a_checkpoint_largest_seqs - .continue_if(a_checkpoints_quorum_reached) - .map(q!(|(_sender, seq)| seq)) - .min() - .unwrap_or(acceptors.singleton(q!(-1)).latest_tick()) - .delta() - .map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq))); - // .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq))); + let p_id = proposers.self_id(); + let (p_ballot_num_complete_cycle, p_ballot_num) = + proposers.tick_cycle_with_initial(proposers.singleton(q!(0)).latest_tick()); - let a_p2as_to_place_in_log = p_to_acceptors_p2a + let p_new_ballot_num = p_received_max_ballot .clone() - .tick_batch() - .cross_singleton(a_max_ballot.clone().latest_tick()) // Don't consider p2as if the current ballot is higher - .filter_map(q!(|(p2a, max_ballot)| - if p2a.ballot >= max_ballot { - Some(CheckpointOrP2a::P2a(p2a)) + .cross_singleton(p_ballot_num.clone()) + .map(q!(move |(received_max_ballot, ballot_num)| { + if received_max_ballot + > (Ballot { + num: ballot_num, + proposer_id: p_id, + }) + { + received_max_ballot.num + 1 } else { - None + ballot_num } - )); - let a_log = a_p2as_to_place_in_log - .union(a_new_checkpoint.into_stream()) - .persist() - .fold( - q!(|| (-1, HashMap::new())), - q!(|(prev_checkpoint, log), checkpoint_or_p2a| { - match checkpoint_or_p2a { - CheckpointOrP2a::Checkpoint(new_checkpoint) => { - // This is a checkpoint message. Delete all entries up to the checkpoint - for slot in *prev_checkpoint..new_checkpoint { - log.remove(&slot); - } - *prev_checkpoint = new_checkpoint; - } - CheckpointOrP2a::P2a(p2a) => { - // This is a regular p2a message. Insert it into the log if it is not checkpointed and has a higher ballot than what was there before - if p2a.slot > *prev_checkpoint - && log - .get(&p2a.slot) - .map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot) - .unwrap_or(true) - { - log.insert( - p2a.slot, - LogValue { - ballot: p2a.ballot, - value: p2a.value, - }, - ); - } - } + })); + p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num); + + let p_has_largest_ballot = p_received_max_ballot + .clone() + .cross_singleton(p_ballot_num.clone()) + .filter(q!( + move |(received_max_ballot, ballot_num)| *received_max_ballot + <= Ballot { + num: *ballot_num, + proposer_id: p_id } - }), - ); + )); - let a_to_proposers_p2b_new = p_to_acceptors_p2a - .tick_batch() - .cross_singleton(a_max_ballot.clone().latest_tick()) - .map(q!(|(p2a, max_ballot)| ( - p2a.ballot.proposer_id, - P2b { - victory: p2a.ballot == max_ballot, - ballot: p2a.ballot, - slot: p2a.slot, - value: p2a.value - } - ))) - .all_ticks() - .send_bincode_interleaved(proposers); - (a_log, a_to_proposers_p2b_new) + // End stable leader election + (p_ballot_num, p_has_largest_ballot) } -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -fn acceptor_p1<'a, P: PaxosPayload>( - p_to_acceptors_p1a: Stream>, - a_max_ballot: &Singleton>, - a_log: Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, - proposers: &Cluster<'a, Proposer>, -) -> Stream, Unbounded, NoTick, Cluster<'a, Proposer>> { - p_to_acceptors_p1a - .tick_batch() - .cross_singleton(a_max_ballot.clone().latest_tick()) - .cross_singleton(a_log) - .map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| ( - p1a.ballot.proposer_id, - P1b { - ballot: p1a.ballot, - max_ballot, - accepted: log - } - ))) - .all_ticks() - .send_bincode_interleaved(proposers) -} +fn p_leader_expired<'a>( + p_to_proposers_i_am_leader: Stream>, + p_is_leader: Optional>, + i_am_leader_check_timeout: u64, // How often to check if heartbeat expired +) -> Optional, Bounded, Tick, Cluster<'a, Proposer>> { + let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( + q!(|| None), + q!(|latest, _| { + // Note: May want to check received ballot against our own? + *latest = Some(Instant::now()); + }), + ); -fn p_p2b<'a, P: PaxosPayload>( - proposers: &Cluster<'a, Proposer>, - a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, - f: usize, -) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> { - let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); - let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle(); - let p_p2b = a_to_proposers_p2b - .clone() - .tick_batch() - .union(p_persisted_p2bs); - let p_count_matching_p2bs = p_p2b - .clone() - .filter_map(q!(|p2b| if p2b.victory { - // Only consider p2bs where max ballot = ballot, which means that no one preempted us - Some(((p2b.slot, p2b.ballot), p2b.value)) - } else { - None - })) - .fold_keyed( - q!(|| (0, Default::default())), - q!(|accum, value| { - // TODO(shadaj): why is sender unused? should we de-dup? - accum.0 += 1; - accum.1 = value; - }), - ); - let p_p2b_quorum_reached = - p_count_matching_p2bs - .clone() - .filter_map(q!(move |((slot, _ballot), (count, value))| if count > f { - Some((slot, value)) - } else { - None - })); - let p_to_replicas = p_p2b_quorum_reached + p_latest_received_i_am_leader .clone() - .anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once - .map(q!(|(slot, value)| (slot, value))) - .all_ticks(); - - let p_p2b_all_commit_slots = - p_count_matching_p2bs.clone().filter_map(q!( - move |((slot, _ballot), (count, _p2b))| if count == 2 * f + 1 { - Some(slot) + .latest_tick() + .continue_unless(p_is_leader) + .filter(q!(move |latest_received_i_am_leader| { + if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { + (Instant::now().duration_since(*latest_received_i_am_leader)) + > Duration::from_secs(i_am_leader_check_timeout) } else { - None + true } - )); - // p_p2b_all_commit_slots.inspect(q!(|slot: i32| println!("Proposer slot all received: {:?}", slot))); - let p_broadcasted_p2b_slots_new = p_p2b_quorum_reached - .clone() - .map(q!(|(slot, _value)| slot)) - .filter_not_in(p_p2b_all_commit_slots.clone()); - // p_broadcasted_p2b_slots_new.inspect(q!(|slot: i32| println!("Proposer slot broadcasted: {:?}", slot))); - p_broadcasted_p2b_slots_complete_cycle.complete_next_tick(p_broadcasted_p2b_slots_new); - let p_persisted_p2bs_new = p_p2b - .clone() - .map(q!(|p2b| (p2b.slot, p2b))) - .anti_join(p_p2b_all_commit_slots.clone()) - .map(q!(|(_slot, p2b)| p2b)); - // TOOD: only persist if we are the leader - // p_persisted_p2bs_new.inspect(q!(|(sender, p2b): (u32, P2b)| println!("Proposer persisting p2b: {:?}", p2b))); - p_persisted_p2bs_complete_cycle.complete_next_tick(p_persisted_p2bs_new); - p_to_replicas + })) } -// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. -#[expect( - clippy::type_complexity, - clippy::too_many_arguments, - reason = "internal paxos code // TODO" -)] -fn p_p2a<'a, P: PaxosPayload>( - proposers: &Cluster<'a, Proposer>, - p_max_slot: Optional>, - c_to_proposers: Stream>, +// Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. +#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +fn p_p1a<'a>( p_ballot_num: Singleton>, - p_log_to_try_commit: Stream, Bounded, Tick, Cluster<'a, Proposer>>, - p_log_holes: Stream, Bounded, Tick, Cluster<'a, Proposer>>, p_is_leader: Optional>, + proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, + i_am_leader_send_timeout: u64, // How often to heartbeat + i_am_leader_check_timeout: u64, // How often to check if heartbeat expired + i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( - Optional>, - Stream, Unbounded, NoTick, Cluster<'a, Acceptor>>, + Stream>, + Stream>, ) { let p_id = proposers.self_id(); - let (p_next_slot_complete_cycle, p_next_slot) = - proposers.tick_cycle::>(); - let p_next_slot_after_reconciling_p1bs = p_max_slot - .unwrap_or(proposers.singleton(q!(-1)).latest_tick()) - // .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot))) - .continue_unless(p_next_slot.clone()) - .map(q!(|max_slot| max_slot + 1)); - - // Send p2as - let p_indexed_payloads = c_to_proposers + let p_to_proposers_i_am_leader = p_is_leader .clone() - .tick_batch() - .enumerate() - .cross_singleton(p_next_slot.clone()) - // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) - .cross_singleton(p_ballot_num.clone()) - // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) - .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload })); - // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); - let p_to_acceptors_p2a = p_log_to_try_commit - .union(p_log_holes) - .continue_unless(p_next_slot.clone()) // Only resend p1b stuff once. Once it's resent, next_slot will exist. - .union(p_indexed_payloads) - .continue_if(p_is_leader.clone()) + .then(p_ballot_num.clone()) + .latest() + .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) + .map(q!(move |ballot_num| Ballot { + num: ballot_num, + proposer_id: p_id + })) + .broadcast_bincode_interleaved(proposers); + + let p_leader_expired = p_leader_expired( + p_to_proposers_i_am_leader.clone(), + p_is_leader, + i_am_leader_check_timeout, + ); + + let p_id = proposers.self_id(); + + // Add random delay depending on node ID so not everyone sends p1a at the same time + let p_to_acceptors_p1a = p_leader_expired + .then(p_ballot_num) + .continue_if( + proposers + .source_interval_delayed( + q!(Duration::from_secs( + (p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into() + )), + q!(Duration::from_secs(i_am_leader_check_timeout)), + ) + .latest_tick(), + ) + .map(q!(move |ballot_num| P1a { + ballot: Ballot { + num: ballot_num, + proposer_id: p_id + } + })) .all_ticks() + .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) .broadcast_bincode_interleaved(acceptors); + (p_to_proposers_i_am_leader, p_to_acceptors_p1a) +} - let p_num_payloads = c_to_proposers.clone().tick_batch().count(); - let p_exists_payloads = p_num_payloads - .clone() - .filter(q!(|num_payloads| *num_payloads > 0)); - let p_next_slot_after_sending_payloads = p_num_payloads - .continue_if(p_exists_payloads.clone()) - .clone() - .cross_singleton(p_next_slot.clone()) - .map(q!( - |(num_payloads, next_slot)| next_slot + num_payloads as i32 - )); - let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads); - let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs - // .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot))) - .union(p_next_slot_after_sending_payloads) - // .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot)))) - .union(p_next_slot_if_no_payloads) - // .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot)))) - .continue_if(p_is_leader.clone()); - let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all - .clone() - .continue_unless(p_new_next_slot_calculated.clone()) - .map(q!(|_| 0)); - // .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot))); - let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default); - p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot); - (p_next_slot, p_to_acceptors_p2a) +#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +fn acceptor_p1<'a, P: PaxosPayload>( + p_to_acceptors_p1a: Stream>, + a_max_ballot: &Singleton>, + a_log: Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, + proposers: &Cluster<'a, Proposer>, +) -> Stream, Unbounded, NoTick, Cluster<'a, Proposer>> { + p_to_acceptors_p1a + .tick_batch() + .cross_singleton(a_max_ballot.clone().latest_tick()) + .cross_singleton(a_log) + .map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| ( + p1a.ballot.proposer_id, + P1b { + ballot: p1a.ballot, + max_ballot, + accepted: log + } + ))) + .all_ticks() + .send_bincode_interleaved(proposers) } // Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes. @@ -675,157 +489,336 @@ fn p_p1b<'a, P: PaxosPayload>( (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) } -// Proposer logic to calculate the largest ballot received so far. -fn p_max_ballot<'a, P: PaxosPayload>( +#[expect( + clippy::type_complexity, + clippy::too_many_arguments, + reason = "internal paxos code // TODO" +)] +fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, - a_to_proposers_p1b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, - a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, - p_to_proposers_i_am_leader: Stream>, -) -> Singleton> { - let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot)); - let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot)); - p_received_p1b_ballots - .union(p_received_p2b_ballots) - .union(p_to_proposers_i_am_leader) - .max() - .unwrap_or(proposers.singleton(q!(Ballot { - num: 0, - proposer_id: ClusterId::from_raw(0) - }))) -} + acceptors: &Cluster<'a, Acceptor>, + c_to_proposers: Stream>, + r_to_acceptors_checkpoint: impl FnOnce( + &Cluster<'a, Acceptor>, + ) -> Stream< + (ClusterId, i32), + Unbounded, + NoTick, + Cluster<'a, Acceptor>, + >, -// Proposer logic to calculate the next ballot number. Expects p_received_max_ballot, the largest ballot received so far. Outputs streams: ballot_num, and has_largest_ballot, which only contains a value if we have the largest ballot. -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -fn p_ballot_calc<'a>( - proposers: &Cluster<'a, Proposer>, - p_received_max_ballot: Singleton>, + p_ballot_num: Singleton>, + p_is_leader: Optional>, + p_max_slot: Optional>, + + p_log_to_try_commit: Stream, Bounded, Tick, Cluster<'a, Proposer>>, + p_log_holes: Stream, Bounded, Tick, Cluster<'a, Proposer>>, + f: usize, + + a_max_ballot: Singleton>, ) -> ( - Singleton>, - Optional<(Ballot, u32), Bounded, Tick, Cluster<'a, Proposer>>, + Stream>, + Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, + Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, + Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, ) { let p_id = proposers.self_id(); - let (p_ballot_num_complete_cycle, p_ballot_num) = - proposers.tick_cycle_with_initial(proposers.singleton(q!(0)).latest_tick()); + let (p_next_slot, p_to_acceptors_p2a) = p_p2a( + proposers, + p_max_slot, + c_to_proposers, + p_ballot_num.clone(), + p_log_to_try_commit, + p_log_holes, + p_is_leader.clone(), + acceptors, + ); - let p_new_ballot_num = p_received_max_ballot - .clone() - .cross_singleton(p_ballot_num.clone()) - .map(q!(move |(received_max_ballot, ballot_num)| { - if received_max_ballot - > (Ballot { - num: ballot_num, - proposer_id: p_id, - }) - { - received_max_ballot.num + 1 - } else { - ballot_num - } - })); - p_ballot_num_complete_cycle.complete_next_tick(p_new_ballot_num); + // Tell clients that leader election has completed and they can begin sending messages + let p_to_clients_new_leader_elected = p_is_leader.clone() + .continue_unless(p_next_slot) + .cross_singleton(p_ballot_num) + .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes + .all_ticks(); - let p_has_largest_ballot = p_received_max_ballot - .clone() - .cross_singleton(p_ballot_num.clone()) - .filter(q!( - move |(received_max_ballot, ballot_num)| *received_max_ballot - <= Ballot { - num: *ballot_num, - proposer_id: p_id - } - )); + // Acceptors. + acceptors + .source_iter(q!(["Acceptors say hello"])) + .for_each(q!(|s| println!("{}", s))); + let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors); - // End stable leader election - (p_ballot_num, p_has_largest_ballot) + // p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a))); + let (a_log, a_to_proposers_p2b) = acceptor_p2( + a_max_ballot.clone(), + p_to_acceptors_p2a, + r_to_acceptors_checkpoint, + proposers, + acceptors, + f, + ); + + // End tell clients that leader election has completed + let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f); + + ( + p_to_clients_new_leader_elected, + p_to_replicas, + a_log, + a_to_proposers_p2b, + ) } -// Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. +#[derive(Clone)] +enum CheckpointOrP2a

{ + Checkpoint(i32), + P2a(P2a

), +} + +// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. #[expect( - clippy::too_many_arguments, clippy::type_complexity, + clippy::too_many_arguments, reason = "internal paxos code // TODO" )] -fn p_p1a<'a>( +fn p_p2a<'a, P: PaxosPayload>( + proposers: &Cluster<'a, Proposer>, + p_max_slot: Optional>, + c_to_proposers: Stream>, p_ballot_num: Singleton>, + p_log_to_try_commit: Stream, Bounded, Tick, Cluster<'a, Proposer>>, + p_log_holes: Stream, Bounded, Tick, Cluster<'a, Proposer>>, p_is_leader: Optional>, - proposers: &Cluster<'a, Proposer>, - p_to_proposers_i_am_leader: Stream>, acceptors: &Cluster<'a, Acceptor>, - i_am_leader_send_timeout: u64, // How often to heartbeat - i_am_leader_check_timeout: u64, // How often to check if heartbeat expired - i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */ ) -> ( - Stream>, - Stream>, + Optional>, + Stream, Unbounded, NoTick, Cluster<'a, Acceptor>>, ) { let p_id = proposers.self_id(); - let p_to_proposers_i_am_leader_new = p_is_leader + let (p_next_slot_complete_cycle, p_next_slot) = + proposers.tick_cycle::>(); + let p_next_slot_after_reconciling_p1bs = p_max_slot + .unwrap_or(proposers.singleton(q!(-1)).latest_tick()) + // .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot))) + .continue_unless(p_next_slot.clone()) + .map(q!(|max_slot| max_slot + 1)); + + // Send p2as + let p_indexed_payloads = c_to_proposers .clone() - .then(p_ballot_num.clone()) - .latest() - .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) - .map(q!(move |ballot_num| Ballot { - num: ballot_num, - proposer_id: p_id - })) - .broadcast_bincode_interleaved(proposers); + .tick_batch() + .enumerate() + .cross_singleton(p_next_slot.clone()) + // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) + .cross_singleton(p_ballot_num.clone()) + // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) + .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload })); + // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); + let p_to_acceptors_p2a = p_log_to_try_commit + .union(p_log_holes) + .continue_unless(p_next_slot.clone()) // Only resend p1b stuff once. Once it's resent, next_slot will exist. + .union(p_indexed_payloads) + .continue_if(p_is_leader.clone()) + .all_ticks() + .broadcast_bincode_interleaved(acceptors); - let p_leader_expired = p_leader_expired( - p_to_proposers_i_am_leader, - p_is_leader, - i_am_leader_check_timeout, - ); + let p_num_payloads = c_to_proposers.clone().tick_batch().count(); + let p_exists_payloads = p_num_payloads + .clone() + .filter(q!(|num_payloads| *num_payloads > 0)); + let p_next_slot_after_sending_payloads = p_num_payloads + .continue_if(p_exists_payloads.clone()) + .clone() + .cross_singleton(p_next_slot.clone()) + .map(q!( + |(num_payloads, next_slot)| next_slot + num_payloads as i32 + )); + let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads); + let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs + // .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot))) + .union(p_next_slot_after_sending_payloads) + // .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot)))) + .union(p_next_slot_if_no_payloads) + // .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot)))) + .continue_if(p_is_leader.clone()); + let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all + .clone() + .continue_unless(p_new_next_slot_calculated.clone()) + .map(q!(|_| 0)); + // .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot))); + let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default); + p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot); + (p_next_slot, p_to_acceptors_p2a) +} - let p_id = proposers.self_id(); +#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +fn acceptor_p2<'a, P: PaxosPayload, R>( + a_max_ballot: Singleton>, + p_to_acceptors_p2a: Stream, Unbounded, NoTick, Cluster<'a, Acceptor>>, + r_to_acceptors_checkpoint: Stream< + (ClusterId, i32), + Unbounded, + NoTick, + Cluster<'a, Acceptor>, + >, + proposers: &Cluster<'a, Proposer>, + acceptors: &Cluster<'a, Acceptor>, + f: usize, +) -> ( + Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>, + Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, +) { + // Get the latest checkpoint sequence per replica + let a_checkpoint_largest_seqs = + r_to_acceptors_checkpoint + .tick_prefix() + .reduce_keyed(q!(|curr_seq, seq| { + if seq > *curr_seq { + *curr_seq = seq; + } + })); + let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs.clone().count().filter_map(q!( + move |num_received| if num_received == f + 1 { + Some(true) + } else { + None + } + )); + // Find the smallest checkpoint seq that everyone agrees to, track whenever it changes + let a_new_checkpoint = a_checkpoint_largest_seqs + .continue_if(a_checkpoints_quorum_reached) + .map(q!(|(_sender, seq)| seq)) + .min() + .unwrap_or(acceptors.singleton(q!(-1)).latest_tick()) + .delta() + .map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq))); + // .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq))); - // Add random delay depending on node ID so not everyone sends p1a at the same time - let p_to_acceptors_p1a = p_leader_expired - .then(p_ballot_num) - .continue_if( - proposers - .source_interval_delayed( - q!(Duration::from_secs( - (p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into() - )), - q!(Duration::from_secs(i_am_leader_check_timeout)), - ) - .latest_tick(), - ) - .map(q!(move |ballot_num| P1a { - ballot: Ballot { - num: ballot_num, - proposer_id: p_id + let a_p2as_to_place_in_log = p_to_acceptors_p2a + .clone() + .tick_batch() + .cross_singleton(a_max_ballot.clone().latest_tick()) // Don't consider p2as if the current ballot is higher + .filter_map(q!(|(p2a, max_ballot)| + if p2a.ballot >= max_ballot { + Some(CheckpointOrP2a::P2a(p2a)) + } else { + None } - })) + )); + let a_log = a_p2as_to_place_in_log + .union(a_new_checkpoint.into_stream()) + .persist() + .fold( + q!(|| (-1, HashMap::new())), + q!(|(prev_checkpoint, log), checkpoint_or_p2a| { + match checkpoint_or_p2a { + CheckpointOrP2a::Checkpoint(new_checkpoint) => { + // This is a checkpoint message. Delete all entries up to the checkpoint + for slot in *prev_checkpoint..new_checkpoint { + log.remove(&slot); + } + *prev_checkpoint = new_checkpoint; + } + CheckpointOrP2a::P2a(p2a) => { + // This is a regular p2a message. Insert it into the log if it is not checkpointed and has a higher ballot than what was there before + if p2a.slot > *prev_checkpoint + && log + .get(&p2a.slot) + .map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot) + .unwrap_or(true) + { + log.insert( + p2a.slot, + LogValue { + ballot: p2a.ballot, + value: p2a.value, + }, + ); + } + } + } + }), + ); + + let a_to_proposers_p2b = p_to_acceptors_p2a + .tick_batch() + .cross_singleton(a_max_ballot.clone().latest_tick()) + .map(q!(|(p2a, max_ballot)| ( + p2a.ballot.proposer_id, + P2b { + victory: p2a.ballot == max_ballot, + ballot: p2a.ballot, + slot: p2a.slot, + value: p2a.value + } + ))) .all_ticks() - .inspect(q!(|_| println!("Proposer leader expired, sending P1a"))) - .broadcast_bincode_interleaved(acceptors); - (p_to_proposers_i_am_leader_new, p_to_acceptors_p1a) + .send_bincode_interleaved(proposers); + (a_log, a_to_proposers_p2b) } -fn p_leader_expired<'a>( - p_to_proposers_i_am_leader: Stream>, - p_is_leader: Optional>, - i_am_leader_check_timeout: u64, // How often to check if heartbeat expired -) -> Optional, Bounded, Tick, Cluster<'a, Proposer>> { - let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( - q!(|| None), - q!(|latest, _| { - // Note: May want to check received ballot against our own? - *latest = Some(Instant::now()); - }), - ); - - p_latest_received_i_am_leader +fn p_p2b<'a, P: PaxosPayload>( + proposers: &Cluster<'a, Proposer>, + a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>, + f: usize, +) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> { + let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); + let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle(); + let p_p2b = a_to_proposers_p2b .clone() - .latest_tick() - .continue_unless(p_is_leader) - .filter(q!(move |latest_received_i_am_leader| { - if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { - (Instant::now().duration_since(*latest_received_i_am_leader)) - > Duration::from_secs(i_am_leader_check_timeout) + .tick_batch() + .union(p_persisted_p2bs); + let p_count_matching_p2bs = p_p2b + .clone() + .filter_map(q!(|p2b| if p2b.victory { + // Only consider p2bs where max ballot = ballot, which means that no one preempted us + Some(((p2b.slot, p2b.ballot), p2b.value)) + } else { + None + })) + .fold_keyed( + q!(|| (0, Default::default())), + q!(|accum, value| { + // TODO(shadaj): why is sender unused? should we de-dup? + accum.0 += 1; + accum.1 = value; + }), + ); + let p_p2b_quorum_reached = + p_count_matching_p2bs + .clone() + .filter_map(q!(move |((slot, _ballot), (count, value))| if count > f { + Some((slot, value)) } else { - true + None + })); + let p_to_replicas = p_p2b_quorum_reached + .clone() + .anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once + .map(q!(|(slot, value)| (slot, value))) + .all_ticks(); + + let p_p2b_all_commit_slots = + p_count_matching_p2bs.clone().filter_map(q!( + move |((slot, _ballot), (count, _p2b))| if count == 2 * f + 1 { + Some(slot) + } else { + None } - })) + )); + // p_p2b_all_commit_slots.inspect(q!(|slot: i32| println!("Proposer slot all received: {:?}", slot))); + let p_broadcasted_p2b_slots_new = p_p2b_quorum_reached + .clone() + .map(q!(|(slot, _value)| slot)) + .filter_not_in(p_p2b_all_commit_slots.clone()); + // p_broadcasted_p2b_slots_new.inspect(q!(|slot: i32| println!("Proposer slot broadcasted: {:?}", slot))); + p_broadcasted_p2b_slots_complete_cycle.complete_next_tick(p_broadcasted_p2b_slots_new); + let p_persisted_p2bs_new = p_p2b + .clone() + .map(q!(|p2b| (p2b.slot, p2b))) + .anti_join(p_p2b_all_commit_slots.clone()) + .map(q!(|(_slot, p2b)| p2b)); + // TOOD: only persist if we are the leader + // p_persisted_p2bs_new.inspect(q!(|(sender, p2b): (u32, P2b)| println!("Proposer persisting p2b: {:?}", p2b))); + p_persisted_p2bs_complete_cycle.complete_next_tick(p_persisted_p2bs_new); + p_to_replicas } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index adbac8d56634..5614a8d634b0 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -16,7 +16,7 @@ expression: built.ir() }, CycleSink { ident: Ident { - sym: cycle_3, + sym: cycle_4, }, location_kind: Cluster( 2, @@ -88,7 +88,7 @@ expression: built.ir() inner: : Union( CycleSource { ident: Ident { - sym: cycle_3, + sym: cycle_4, }, location_kind: Cluster( 2, @@ -117,119 +117,80 @@ expression: built.ir() location_kind: Cluster( 2, ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 2, - ), - from_key: None, - to_location: Cluster( - 2, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", - ], - }, + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 2, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 2, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: Ballot) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: Ballot > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), - input: CrossSingleton( - Persist( - Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), - input: Tee { - inner: , - }, - }, - ), - Tee { - inner: , - }, - ), - }, - }, - }, - }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Tee { - inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - }, + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: : CycleSource { + ident: Ident { + sym: cycle_3, }, - ), + location_kind: Cluster( + 2, + ), + }, }, }, - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Source { - source: Interval( - stageleft :: runtime_support :: type_hint :: < core :: time :: Duration > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }), - ), - location_kind: Cluster( - 2, ), }, - }, - ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Source { + source: Interval( + stageleft :: runtime_support :: type_hint :: < core :: time :: Duration > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }), + ), + location_kind: Cluster( + 2, + ), + }, + }, + ), + }, }, }, }, @@ -243,191 +204,252 @@ expression: built.ir() location_kind: Cluster( 2, ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 2, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", - ], - }, + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 3, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 2, ), - ), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), - input: CrossSingleton( - CrossSingleton( - Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 2, - ), - from_key: None, - to_location: Cluster( - 3, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }", - ], - }, + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), + input: CrossSingleton( + CrossSingleton( + Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 2, ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }", - ], - }, + from_key: None, + to_location: Cluster( + 3, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: , - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), - input: Persist( - Tee { - inner: , - }, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: , + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }), + input: Persist( + Tee { + inner: , + }, + ), + }, }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , + }, }, }, }, - }, - ), + ), + }, }, }, - }, - ), - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), - input: Source { - source: Stream( - { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, - ), - location_kind: Cluster( - 2, ), }, - }, - ), + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Source { + source: Stream( + { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, + ), + location_kind: Cluster( + 2, + ), + }, + }, + ), + }, }, }, }, }, }, }, - }, - Tee { - inner: : Union( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Persist( - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }), - input: Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }), - input: Tee { - inner: , + Tee { + inner: : Union( + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Persist( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }), + input: Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }), + input: Tee { + inner: , + }, }, }, - }, - ), - }, - Persist( - Source { - source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, - ), - location_kind: Cluster( - 3, ), }, + Persist( + Source { + source: Iter( + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, + ), + location_kind: Cluster( + 3, + ), + }, + ), ), + }, + ), + CycleSource { + ident: Ident { + sym: cycle_0, + }, + location_kind: Cluster( + 3, ), }, ), - CycleSource { - ident: Ident { - sym: cycle_0, + }, + }, + }, + }, + }, + CycleSink { + ident: Ident { + sym: cycle_3, + }, + location_kind: Cluster( + 2, + ), + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + input: CrossSingleton( + FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + input: Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + input: CrossSingleton( + Persist( + Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + input: Tee { + inner: , + }, + }, + ), + Tee { + inner: , + }, + ), + }, + }, }, - location_kind: Cluster( - 3, - ), }, - ), - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + input: Tee { + inner: : Filter { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + }, + }, + ), }, }, }, CycleSink { ident: Ident { - sym: cycle_4, + sym: cycle_5, }, location_kind: Cluster( 2, @@ -435,7 +457,7 @@ expression: built.ir() input: DeferTick( Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( @@ -447,18 +469,18 @@ expression: built.ir() input: CrossSingleton( Union( Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }), input: FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { - inner: , + inner: , }, }, }, @@ -485,9 +507,9 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { - sym: cycle_4, + sym: cycle_5, }, location_kind: Cluster( 2, @@ -504,15 +526,15 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -560,10 +582,10 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: : Filter { + inner: : Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | num_payloads | * num_payloads > 0 }), input: Tee { - inner: , + inner: , }, }, }, @@ -572,7 +594,7 @@ expression: built.ir() }, }, Tee { - inner: , + inner: , }, ), }, @@ -581,7 +603,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -591,7 +613,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -602,7 +624,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), @@ -614,7 +636,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -624,7 +646,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -648,7 +670,7 @@ expression: built.ir() }, CycleSink { ident: Ident { - sym: cycle_5, + sym: cycle_6, }, location_kind: Cluster( 2, @@ -658,18 +680,18 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { - inner: : FoldKeyed { + inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }), input: FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . victory { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { - inner: : Union( + inner: : Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -705,7 +727,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { victory : p2a . ballot == max_ballot , ballot : p2a . ballot , slot : p2a . slot , value : p2a . value }) }), input: CrossSingleton( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -751,7 +773,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { inner: , @@ -765,13 +787,13 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), input: Tee { - inner: , + inner: , }, }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { - inner: , + inner: , }, }, ), @@ -789,7 +811,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -802,11 +824,11 @@ expression: built.ir() CrossSingleton( Enumerate( Tee { - inner: , + inner: , }, ), Tee { - inner: , + inner: , }, ), Tee { @@ -818,7 +840,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ), @@ -837,7 +859,7 @@ expression: built.ir() }, CycleSource { ident: Ident { - sym: cycle_6, + sym: cycle_7, }, location_kind: Cluster( 2, @@ -852,10 +874,10 @@ expression: built.ir() }, }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { - inner: , + inner: , }, }, }, @@ -864,7 +886,7 @@ expression: built.ir() }, CycleSink { ident: Ident { - sym: cycle_6, + sym: cycle_7, }, location_kind: Cluster( 2, @@ -876,11 +898,11 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { - inner: , + inner: , }, }, Tee { - inner: , + inner: , }, ), }, @@ -894,7 +916,7 @@ expression: built.ir() 3, ), input: Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), input: Persist( @@ -903,7 +925,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { inner: , @@ -922,7 +944,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: : ReduceKeyed { + inner: : ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), input: Persist( Network { @@ -978,7 +1000,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1013,7 +1035,7 @@ expression: built.ir() 2, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1030,10 +1052,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { - inner: : Sort( + inner: : Sort( Union( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( @@ -1073,11 +1095,11 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , value) | (slot , value) }), input: AntiJoin( Tee { - inner: , + inner: , }, CycleSource { ident: Ident { - sym: cycle_5, + sym: cycle_6, }, location_kind: Cluster( 2, @@ -1102,12 +1124,12 @@ expression: built.ir() ), }, Tee { - inner: : Fold { + inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), input: CrossSingleton( Tee { - inner: , + inner: , }, Union( CycleSource { @@ -1144,23 +1166,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), input: Persist( Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1181,7 +1203,7 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Union( @@ -1210,7 +1232,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1225,7 +1247,7 @@ expression: built.ir() 1, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1238,7 +1260,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Union( + inner: : Union( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { @@ -1274,7 +1296,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1290,13 +1312,13 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1317,7 +1339,7 @@ expression: built.ir() input: Union( Union( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_2, }, @@ -1331,9 +1353,9 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: : Delta( + inner: : Delta( Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( Inspect { @@ -1379,7 +1401,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { - inner: , + inner: , }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), @@ -1389,7 +1411,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1414,10 +1436,10 @@ expression: built.ir() }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1430,7 +1452,7 @@ expression: built.ir() input: CrossSingleton( CrossSingleton( Tee { - inner: : Source { + inner: : Source { source: Interval( { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, ), @@ -1448,10 +1470,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1459,7 +1481,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: , + inner: , }, }, ), @@ -1481,7 +1503,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1492,7 +1514,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1504,7 +1526,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1524,17 +1546,17 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot . leader_id () , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), input: Tee { - inner: , + inner: , }, }, Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot . leader_id () , ClientPayload { key , value : c_id . raw_id . to_string () }) }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), },