diff --git a/hydroflow_plus/src/location.rs b/hydroflow_plus/src/location.rs index 66b6aee9292..65b05334fe6 100644 --- a/hydroflow_plus/src/location.rs +++ b/hydroflow_plus/src/location.rs @@ -148,36 +148,26 @@ pub trait Location<'a> { fn source_interval( &self, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Optional<(), Unbounded, NoTick, Self> + ) -> Stream where Self: Sized, { - let interval = interval.splice_untyped(); - - Optional::new( - self.id(), - self.flow_state().clone(), - HfPlusNode::Persist(Box::new(HfPlusNode::Source { - source: HfPlusSource::Interval(interval.into()), - location_kind: self.id(), - })), - ) + self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new( + tokio::time::interval(interval) + ))) } fn source_interval_delayed( &self, delay: impl Quoted<'a, Duration> + Copy + 'a, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Optional + ) -> Stream where Self: Sized, { self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new( tokio::time::interval_at(tokio::time::Instant::now() + delay, interval) ))) - .tick_batch() - .first() - .latest() } fn forward_ref>( diff --git a/hydroflow_plus_test/src/cluster/mod.rs b/hydroflow_plus_test/src/cluster/mod.rs index bcc449c3195..b4e5be9bb00 100644 --- a/hydroflow_plus_test/src/cluster/mod.rs +++ b/hydroflow_plus_test/src/cluster/mod.rs @@ -3,5 +3,6 @@ pub mod many_to_many; pub mod map_reduce; pub mod paxos; pub mod paxos_bench; +pub mod paxos_kv; pub mod simple_cluster; pub mod two_pc; diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index 2f704830447..05dcb8382f5 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -56,33 +56,29 @@ struct P2b

{ value: P, } -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +#[expect( + clippy::too_many_arguments, + clippy::type_complexity, + reason = "internal paxos code // TODO" +)] pub fn paxos_core<'a, P: PaxosPayload, R>( - flow: &FlowBuilder<'a>, - r_to_acceptors_checkpoint: impl FnOnce( - &Cluster<'a, Acceptor>, - ) -> Stream< + proposers: &Cluster<'a, Proposer>, + acceptors: &Cluster<'a, Acceptor>, + r_to_acceptors_checkpoint: Stream< (ClusterId, i32), Unbounded, NoTick, Cluster<'a, Acceptor>, >, - c_to_proposers: impl FnOnce( - &Cluster<'a, Proposer>, - ) -> Stream>, + c_to_proposers: Stream>, f: usize, i_am_leader_send_timeout: u64, i_am_leader_check_timeout: u64, i_am_leader_check_timeout_delay_multiplier: usize, ) -> ( - Cluster<'a, Proposer>, - Cluster<'a, Acceptor>, Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>, ) { - let proposers = flow.cluster::(); - let acceptors = flow.cluster::(); - proposers .source_iter(q!(["Proposers say hello"])) .for_each(q!(|s| println!("{}", s))); @@ -97,8 +93,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( acceptors.tick_forward_ref::>), _, _, _>>(); let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( - &proposers, - &acceptors, + proposers, + acceptors, f, i_am_leader_send_timeout, i_am_leader_check_timeout, @@ -118,17 +114,15 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( .all_ticks(); let (p_log_to_try_commit, p_max_slot, p_log_holes) = - recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f); + recommit_after_leader_election(proposers, p_relevant_p1bs, p_ballot_num.clone(), f); let p_log_to_recommit = p_log_to_try_commit .union(p_log_holes) .continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader. - let c_to_proposers = c_to_proposers(&proposers); - let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload( - &proposers, - &acceptors, + proposers, + acceptors, c_to_proposers, r_to_acceptors_checkpoint, p_ballot_num, @@ -142,12 +136,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( a_log_complete_cycle.complete(a_log); a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b); - ( - proposers, - acceptors, - p_to_clients_new_leader_elected, - p_to_replicas, - ) + (p_to_clients_new_leader_elected, p_to_replicas) } #[expect( @@ -350,7 +339,8 @@ fn p_leader_heartbeat<'a>( )), q!(Duration::from_secs(i_am_leader_check_timeout)), ) - .latest_tick(), + .tick_batch() + .first(), ); (p_to_proposers_i_am_leader, p_trigger_election) } @@ -532,9 +522,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, c_to_proposers: Stream>, - r_to_acceptors_checkpoint: impl FnOnce( - &Cluster<'a, Acceptor>, - ) -> Stream< + r_to_acceptors_checkpoint: Stream< (ClusterId, i32), Unbounded, NoTick, @@ -565,8 +553,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>( ); // Acceptors. - let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors); - // p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a))); let (a_log, a_to_proposers_p2b) = acceptor_p2( a_max_ballot.clone(), diff --git a/hydroflow_plus_test/src/cluster/paxos_bench.rs b/hydroflow_plus_test/src/cluster/paxos_bench.rs index a1a0ec69059..2332a176597 100644 --- a/hydroflow_plus_test/src/cluster/paxos_bench.rs +++ b/hydroflow_plus_test/src/cluster/paxos_bench.rs @@ -1,43 +1,15 @@ use std::cell::RefCell; -use std::collections::HashMap; use std::rc::Rc; use std::time::{Duration, SystemTime}; use hydroflow_plus::*; -use serde::{Deserialize, Serialize}; use stageleft::*; -use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer}; - -pub struct Replica {} +use super::paxos::{Acceptor, Proposer}; +use super::paxos_kv::{paxos_kv, KvPayload, Replica, SequencedKv}; pub struct Client {} -#[derive(Serialize, serde::Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] -pub struct ReplicaPayload { - // Note: Important that seq is the first member of the struct for sorting - pub seq: i32, - pub key: u32, - pub value: String, -} - -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] -pub struct ClientPayload { - pub key: u32, - pub value: String, -} - -impl Default for ClientPayload { - fn default() -> Self { - Self { - key: 0, - value: "".to_string(), - } - } -} - -impl PaxosPayload for ClientPayload {} - // Important: By convention, all relations that represent booleans either have a single "true" value or nothing. // This allows us to use the continue_if_exists() and continue_if_empty() operators as if they were if (true) and if (false) statements. #[expect(clippy::too_many_arguments, reason = "internal paxos code // TODO")] @@ -56,196 +28,48 @@ pub fn paxos_bench<'a>( Cluster<'a, Client>, Cluster<'a, Replica>, ) { + let proposers = flow.cluster::(); + let acceptors = flow.cluster::(); let clients = flow.cluster::(); let replicas = flow.cluster::(); - let (c_to_proposers_complete_cycle, c_to_proposers) = clients.forward_ref(); - - let (proposers, acceptors, p_to_clients_new_leader_elected, r_new_processed_payloads) = - paxos_with_replica( - flow, - &replicas, - c_to_proposers, - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - checkpoint_frequency, - ); + let (new_leader_elected_complete, new_leader_elected) = + clients.forward_ref::>(); - c_to_proposers_complete_cycle.complete(bench_client( + bench_client( &clients, - p_to_clients_new_leader_elected - .broadcast_bincode(&clients) - .map(q!(|(leader_id, _)| leader_id)), - r_new_processed_payloads.send_bincode(&clients), + new_leader_elected, + |c_to_proposers| { + let (new_leader_elected, processed_payloads) = paxos_kv( + &proposers, + &acceptors, + &replicas, + c_to_proposers.send_bincode_interleaved(&proposers), + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + checkpoint_frequency, + ); + + new_leader_elected_complete.complete( + new_leader_elected + .broadcast_bincode(&clients) + .map(q!(|(leader_id, _)| leader_id)), + ); + processed_payloads + .map(q!(|payload| ( + ClusterId::from_raw(payload.kv.value.parse::().unwrap()), + payload + ))) + .send_bincode(&clients) + }, num_clients_per_node, median_latency_window_size, f, - )); - - (proposers, acceptors, clients, replicas) -} - -#[expect( - clippy::type_complexity, - clippy::too_many_arguments, - reason = "internal paxos code // TODO" -)] -fn paxos_with_replica<'a>( - flow: &FlowBuilder<'a>, - replicas: &Cluster<'a, Replica>, - c_to_proposers: Stream< - (ClusterId, ClientPayload), - Unbounded, - NoTick, - Cluster<'a, Client>, - >, - f: usize, - i_am_leader_send_timeout: u64, - i_am_leader_check_timeout: u64, - i_am_leader_check_timeout_delay_multiplier: usize, - checkpoint_frequency: usize, -) -> ( - Cluster<'a, Proposer>, - Cluster<'a, Acceptor>, - Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, - Stream<(ClusterId, ReplicaPayload), Unbounded, NoTick, Cluster<'a, Replica>>, -) { - let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = - replicas.forward_ref::>(); - - let (proposers, acceptors, p_to_clients_new_leader_elected, p_to_replicas) = paxos_core( - flow, - |acceptors| r_to_acceptors_checkpoint.broadcast_bincode(acceptors), - |proposers| c_to_proposers.send_bincode_interleaved(proposers), - f, - i_am_leader_send_timeout, - i_am_leader_check_timeout, - i_am_leader_check_timeout_delay_multiplier, - ); - - let (r_to_acceptors_checkpoint_new, r_new_processed_payloads) = replica( - replicas, - p_to_replicas - .map(q!(|(slot, data)| ReplicaPayload { - seq: slot, - key: data.key, - value: data.value - })) - .broadcast_bincode_interleaved(replicas), - checkpoint_frequency, ); - r_to_acceptors_checkpoint_complete_cycle.complete(r_to_acceptors_checkpoint_new); - - ( - proposers, - acceptors, - p_to_clients_new_leader_elected, - r_new_processed_payloads, - ) -} - -// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing. -#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] -pub fn replica<'a>( - replicas: &Cluster<'a, Replica>, - p_to_replicas: Stream>, - checkpoint_frequency: usize, -) -> ( - Stream>, - Stream<(ClusterId, ReplicaPayload), Unbounded, NoTick, Cluster<'a, Replica>>, -) { - let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle(); - // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); - let r_sorted_payloads = p_to_replicas - .clone() - .tick_batch() - .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet - .sort(); - // Create a cycle since we'll use this seq before we define it - let (r_highest_seq_complete_cycle, r_highest_seq) = - replicas.tick_cycle::>(); - let empty_slot = replicas.singleton_first_tick(q!(-1)); - // Either the max sequence number executed so far or -1. Need to union otherwise r_highest_seq is empty and joins with it will fail - let r_highest_seq_with_default = r_highest_seq.union(empty_slot); - // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole. - let r_highest_seq_processable_payload = r_sorted_payloads - .clone() - .cross_singleton(r_highest_seq_with_default) - .fold( - q!(|| -1), - q!(|filled_slot, (sorted_payload, highest_seq)| { - // Note: This function only works if the input is sorted on seq. - let next_slot = std::cmp::max(*filled_slot, highest_seq); - - *filled_slot = if sorted_payload.seq == next_slot + 1 { - sorted_payload.seq - } else { - *filled_slot - }; - }), - ); - // Find all payloads that can and cannot be processed in this tick. - let r_processable_payloads = r_sorted_payloads - .clone() - .cross_singleton(r_highest_seq_processable_payload.clone()) - .filter(q!( - |(sorted_payload, highest_seq)| sorted_payload.seq <= *highest_seq - )) - .map(q!(|(sorted_payload, _)| { sorted_payload })); - let r_new_non_processable_payloads = r_sorted_payloads - .clone() - .cross_singleton(r_highest_seq_processable_payload.clone()) - .filter(q!( - |(sorted_payload, highest_seq)| sorted_payload.seq > *highest_seq - )) - .map(q!(|(sorted_payload, _)| { sorted_payload })); - // Save these, we can process them once the hole has been filled - r_buffered_payloads_complete_cycle.complete_next_tick(r_new_non_processable_payloads); - - let r_kv_store = r_processable_payloads - .clone() - .persist() // Optimization: all_ticks() + fold() = fold, where the state of the previous fold is saved and persisted values are deleted. - .fold(q!(|| (HashMap::::new(), -1)), q!(|state, payload| { - let kv_store = &mut state.0; - let last_seq = &mut state.1; - kv_store.insert(payload.key, payload.value); - debug_assert!(payload.seq == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.seq); - *last_seq = payload.seq; - // println!("Replica kv store: {:?}", kv_store); - })); - // Update the highest seq for the next tick - let r_new_highest_seq = r_kv_store.map(q!(|(_kv_store, highest_seq)| highest_seq)); - r_highest_seq_complete_cycle.complete_next_tick(r_new_highest_seq.clone().into()); - - // Send checkpoints to the acceptors when we've processed enough payloads - let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = - replicas.tick_cycle::>(); - let r_max_checkpointed_seq = r_checkpointed_seqs - .persist() - .max() - .unwrap_or(replicas.singleton(q!(-1)).latest_tick()); - let r_checkpoint_seq_new = r_max_checkpointed_seq - .cross_singleton(r_new_highest_seq) - .filter_map(q!( - move |(max_checkpointed_seq, new_highest_seq)| if new_highest_seq - max_checkpointed_seq - >= checkpoint_frequency as i32 - { - Some(new_highest_seq) - } else { - None - } - )); - r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone()); - - // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value. - let r_to_clients = p_to_replicas.map(q!(|payload| ( - ClusterId::from_raw(payload.value.parse::().unwrap()), - payload - ))); - (r_checkpoint_seq_new.all_ticks(), r_to_clients) + (proposers, acceptors, clients, replicas) } // Clients. All relations for clients will be prefixed with c. All ClientPayloads will contain the virtual client number as key and the client's machine ID (to string) as value. Expects p_to_clients_leader_elected containing Ballots whenever the leader is elected, and r_to_clients_payload_applied containing ReplicaPayloads whenever a payload is committed. Outputs (leader address, ClientPayload) when a new leader is elected or when the previous payload is committed. @@ -257,8 +81,10 @@ fn bench_client<'a>( NoTick, Cluster<'a, Client>, >, - r_to_clients_payload_applied: Stream< - (ClusterId, ReplicaPayload), + transaction_cycle: impl FnOnce( + Stream<(ClusterId, KvPayload), Unbounded, NoTick, Cluster<'a, Client>>, + ) -> Stream< + (ClusterId, SequencedKv), Unbounded, NoTick, Cluster<'a, Client>, @@ -266,17 +92,17 @@ fn bench_client<'a>( num_clients_per_node: usize, median_latency_window_size: usize, f: usize, -) -> Stream<(ClusterId, ClientPayload), Unbounded, NoTick, Cluster<'a, Client>> { +) { let c_id = clients.self_id(); // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Only keep the latest leader - let c_max_leader_ballot = p_to_clients_leader_elected + let current_leader = p_to_clients_leader_elected .inspect(q!(|ballot| println!( "Client notified that leader was elected: {:?}", ballot ))) .max(); - let c_new_leader_ballot = c_max_leader_ballot.clone().latest_tick().delta(); + let c_new_leader_ballot = current_leader.clone().latest_tick().delta(); // Whenever the leader changes, make all clients send a message let c_new_payloads_when_leader_elected = c_new_leader_ballot @@ -284,19 +110,23 @@ fn bench_client<'a>( .flat_map(q!(move |leader_ballot| (0..num_clients_per_node).map( move |i| ( leader_ballot, - ClientPayload { + KvPayload { key: i as u32, value: c_id.raw_id.to_string() } ) ))); + + let (c_to_proposers_complete_cycle, c_to_proposers) = clients.forward_ref(); + let transaction_results = transaction_cycle(c_to_proposers); + // Whenever replicas confirm that a payload was committed, collected it and wait for a quorum let (c_pending_quorum_payloads_complete_cycle, c_pending_quorum_payloads) = clients.tick_cycle(); - let c_received_payloads = r_to_clients_payload_applied + let c_received_payloads = transaction_results .tick_batch() .map(q!(|(sender, replica_payload)| ( - replica_payload.key, + replica_payload.kv.key, sender ))) .union(c_pending_quorum_payloads); @@ -321,17 +151,19 @@ fn bench_client<'a>( // Whenever all replicas confirm that a payload was committed, send another payload let c_new_payloads_when_committed = c_received_quorum_payloads .clone() - .cross_singleton(c_max_leader_ballot.clone().latest_tick()) - .map(q!(move |(key, leader_ballot)| ( - leader_ballot, - ClientPayload { + .cross_singleton(current_leader.clone().latest_tick()) + .map(q!(move |(key, cur_leader)| ( + cur_leader, + KvPayload { key, value: c_id.raw_id.to_string() } ))); - let c_to_proposers = c_new_payloads_when_leader_elected - .union(c_new_payloads_when_committed) - .all_ticks(); + c_to_proposers_complete_cycle.complete( + c_new_payloads_when_leader_elected + .union(c_new_payloads_when_committed) + .all_ticks(), + ); // Track statistics let (c_timers_complete_cycle, c_timers) = @@ -355,13 +187,12 @@ fn bench_client<'a>( })); c_timers_complete_cycle.complete_next_tick(c_new_timers); - let c_stats_output_timer = clients.source_interval(q!(Duration::from_secs(1))); + let c_stats_output_timer = clients + .source_interval(q!(Duration::from_secs(1))) + .tick_batch() + .first(); - let c_latency_reset = c_stats_output_timer - .clone() - .latest_tick() - .map(q!(|_| None)) - .defer_tick(); + let c_latency_reset = c_stats_output_timer.clone().map(q!(|_| None)).defer_tick(); let c_latencies = c_timers .join(c_updated_timers) @@ -396,12 +227,11 @@ fn bench_client<'a>( let c_throughput_new_batch = c_received_quorum_payloads .clone() .count() - .continue_unless(c_stats_output_timer.clone().latest_tick()) + .continue_unless(c_stats_output_timer.clone()) .map(q!(|batch_size| (batch_size, false))); let c_throughput_reset = c_stats_output_timer .clone() - .latest_tick() .map(q!(|_| (0, true))) .defer_tick(); @@ -422,7 +252,7 @@ fn bench_client<'a>( c_latencies .cross_singleton(c_throughput) .latest_tick() - .continue_if(c_stats_output_timer.latest_tick()) + .continue_if(c_stats_output_timer) .all_ticks() .for_each(q!(move |(latencies, throughput)| { let mut latencies_mut = latencies.borrow_mut(); @@ -435,7 +265,6 @@ fn bench_client<'a>( println!("Throughput: {} requests/s", throughput); })); // End track statistics - c_to_proposers } #[cfg(test)] diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs new file mode 100644 index 00000000000..ba9a40e73ef --- /dev/null +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -0,0 +1,177 @@ +use std::collections::HashMap; + +use hydroflow_plus::*; +use serde::{Deserialize, Serialize}; +use stageleft::*; + +use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer}; + +pub struct Replica {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] +pub struct KvPayload { + pub key: u32, + pub value: String, +} + +impl Default for KvPayload { + fn default() -> Self { + Self { + key: 0, + value: "".to_string(), + } + } +} + +impl PaxosPayload for KvPayload {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)] +pub struct SequencedKv { + // Note: Important that seq is the first member of the struct for sorting + pub seq: i32, + pub kv: KvPayload, +} + +#[expect( + clippy::type_complexity, + clippy::too_many_arguments, + reason = "internal paxos code // TODO" +)] +pub fn paxos_kv<'a>( + proposers: &Cluster<'a, Proposer>, + acceptors: &Cluster<'a, Acceptor>, + replicas: &Cluster<'a, Replica>, + c_to_proposers: Stream>, + f: usize, + i_am_leader_send_timeout: u64, + i_am_leader_check_timeout: u64, + i_am_leader_check_timeout_delay_multiplier: usize, + checkpoint_frequency: usize, +) -> ( + Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>, + Stream>, +) { + let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) = + replicas.forward_ref::>(); + + let (p_to_clients_new_leader_elected, p_to_replicas) = paxos_core( + proposers, + acceptors, + r_to_acceptors_checkpoint.broadcast_bincode(acceptors), + c_to_proposers, + f, + i_am_leader_send_timeout, + i_am_leader_check_timeout, + i_am_leader_check_timeout_delay_multiplier, + ); + + let (r_to_acceptors_checkpoint_new, r_new_processed_payloads) = replica( + replicas, + p_to_replicas + .map(q!(|(slot, kv)| SequencedKv { seq: slot, kv })) + .broadcast_bincode_interleaved(replicas), + checkpoint_frequency, + ); + + r_to_acceptors_checkpoint_complete_cycle.complete(r_to_acceptors_checkpoint_new); + + (p_to_clients_new_leader_elected, r_new_processed_payloads) +} + +// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing. +#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] +pub fn replica<'a>( + replicas: &Cluster<'a, Replica>, + p_to_replicas: Stream>, + checkpoint_frequency: usize, +) -> ( + Stream>, + Stream>, +) { + let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle(); + // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); + let r_sorted_payloads = p_to_replicas + .clone() + .tick_batch() + .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet + .sort(); + // Create a cycle since we'll use this seq before we define it + let (r_highest_seq_complete_cycle, r_highest_seq) = + replicas.tick_cycle::>(); + let empty_slot = replicas.singleton_first_tick(q!(-1)); + // Either the max sequence number executed so far or -1. Need to union otherwise r_highest_seq is empty and joins with it will fail + let r_highest_seq_with_default = r_highest_seq.union(empty_slot); + // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole. + let r_highest_seq_processable_payload = r_sorted_payloads + .clone() + .cross_singleton(r_highest_seq_with_default) + .fold( + q!(|| -1), + q!(|filled_slot, (sorted_payload, highest_seq)| { + // Note: This function only works if the input is sorted on seq. + let next_slot = std::cmp::max(*filled_slot, highest_seq); + + *filled_slot = if sorted_payload.seq == next_slot + 1 { + sorted_payload.seq + } else { + *filled_slot + }; + }), + ); + // Find all payloads that can and cannot be processed in this tick. + let r_processable_payloads = r_sorted_payloads + .clone() + .cross_singleton(r_highest_seq_processable_payload.clone()) + .filter(q!( + |(sorted_payload, highest_seq)| sorted_payload.seq <= *highest_seq + )) + .map(q!(|(sorted_payload, _)| { sorted_payload })); + let r_new_non_processable_payloads = r_sorted_payloads + .clone() + .cross_singleton(r_highest_seq_processable_payload.clone()) + .filter(q!( + |(sorted_payload, highest_seq)| sorted_payload.seq > *highest_seq + )) + .map(q!(|(sorted_payload, _)| { sorted_payload })); + // Save these, we can process them once the hole has been filled + r_buffered_payloads_complete_cycle.complete_next_tick(r_new_non_processable_payloads); + + let r_kv_store = r_processable_payloads + .clone() + .persist() // Optimization: all_ticks() + fold() = fold, where the state of the previous fold is saved and persisted values are deleted. + .fold(q!(|| (HashMap::::new(), -1)), q!(|state, payload| { + let kv_store = &mut state.0; + let last_seq = &mut state.1; + kv_store.insert(payload.kv.key, payload.kv.value); + debug_assert!(payload.seq == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.seq); + *last_seq = payload.seq; + // println!("Replica kv store: {:?}", kv_store); + })); + // Update the highest seq for the next tick + let r_new_highest_seq = r_kv_store.map(q!(|(_kv_store, highest_seq)| highest_seq)); + r_highest_seq_complete_cycle.complete_next_tick(r_new_highest_seq.clone().into()); + + // Send checkpoints to the acceptors when we've processed enough payloads + let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = + replicas.tick_cycle::>(); + let r_max_checkpointed_seq = r_checkpointed_seqs + .persist() + .max() + .unwrap_or(replicas.singleton(q!(-1)).latest_tick()); + let r_checkpoint_seq_new = r_max_checkpointed_seq + .cross_singleton(r_new_highest_seq) + .filter_map(q!( + move |(max_checkpointed_seq, new_highest_seq)| if new_highest_seq - max_checkpointed_seq + >= checkpoint_frequency as i32 + { + Some(new_highest_seq) + } else { + None + } + )); + r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone()); + + // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value. + let r_to_clients = p_to_replicas; + (r_checkpoint_seq_new.all_ticks(), r_to_clients) +} diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index f1f81755fdd..6103176aa6d 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -10,7 +10,7 @@ expression: built.ir() { use crate :: __staged :: cluster :: paxos :: * ; ["Proposers say hello"] }, ), location_kind: Cluster( - 2, + 0, ), }, }, @@ -21,7 +21,7 @@ expression: built.ir() { use crate :: __staged :: cluster :: paxos :: * ; ["Acceptors say hello"] }, ), location_kind: Cluster( - 3, + 1, ), }, }, @@ -30,11 +30,11 @@ expression: built.ir() sym: cycle_4, }, location_kind: Cluster( - 2, + 0, ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , proposer_id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , proposer_id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }), input: CrossSingleton( Tee { inner: : Union( @@ -44,24 +44,24 @@ expression: built.ir() Union( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), input: CycleSource { ident: Ident { sym: cycle_1, }, location_kind: Cluster( - 2, + 0, ), }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . max_ballot }), input: CycleSource { ident: Ident { sym: cycle_0, }, location_kind: Cluster( - 2, + 0, ), }, }, @@ -71,7 +71,7 @@ expression: built.ir() sym: cycle_2, }, location_kind: Cluster( - 2, + 0, ), }, ), @@ -83,7 +83,7 @@ expression: built.ir() { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, ), location_kind: Cluster( - 2, + 0, ), }, ), @@ -96,7 +96,7 @@ expression: built.ir() sym: cycle_4, }, location_kind: Cluster( - 2, + 0, ), }, Persist( @@ -105,7 +105,7 @@ expression: built.ir() { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; 0 } ; [e] }, ), location_kind: Cluster( - 2, + 0, ), }, ), @@ -120,18 +120,18 @@ expression: built.ir() sym: cycle_2, }, location_kind: Cluster( - 2, + 0, ), input: Tee { inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 2, + 0, ), from_key: None, to_location: Cluster( - 2, + 0, ), to_key: None, serialize_pipeline: Some( @@ -156,9 +156,9 @@ expression: built.ir() ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( @@ -176,7 +176,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Cluster( - 2, + 0, ), }, }, @@ -190,7 +190,7 @@ expression: built.ir() stageleft :: runtime_support :: type_hint :: < core :: time :: Duration > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }), ), location_kind: Cluster( - 2, + 0, ), }, }, @@ -207,18 +207,18 @@ expression: built.ir() sym: cycle_1, }, location_kind: Cluster( - 2, + 0, ), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 3, + 1, ), from_key: None, to_location: Cluster( - 2, + 0, ), to_key: None, serialize_pipeline: Some( @@ -226,7 +226,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -237,13 +237,13 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), input: CrossSingleton( CrossSingleton( Tee { @@ -251,11 +251,11 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 2, + 0, ), from_key: None, to_location: Cluster( - 3, + 1, ), to_key: None, serialize_pipeline: Some( @@ -280,11 +280,11 @@ expression: built.ir() ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Inspect { f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }), input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( @@ -330,10 +330,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Source { source: Stream( - { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, + { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) }, ), location_kind: Cluster( - 2, + 0, ), }, }, @@ -370,7 +370,7 @@ expression: built.ir() { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] }, ), location_kind: Cluster( - 3, + 1, ), }, ), @@ -378,13 +378,13 @@ expression: built.ir() }, ), Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), input: CycleSource { ident: Ident { sym: cycle_0, }, location_kind: Cluster( - 3, + 1, ), }, }, @@ -399,7 +399,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Cluster( - 2, + 0, ), input: Tee { inner: : Map { @@ -409,16 +409,16 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), input: CrossSingleton( Persist( Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Tee { inner: , }, @@ -437,7 +437,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { inner: : Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }), input: CrossSingleton( Tee { inner: , @@ -458,7 +458,7 @@ expression: built.ir() sym: cycle_5, }, location_kind: Cluster( - 2, + 0, ), input: DeferTick( Map { @@ -475,13 +475,13 @@ expression: built.ir() inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { inner: , }, @@ -498,7 +498,7 @@ expression: built.ir() { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; 0 } ; [e] }, ), location_kind: Cluster( - 2, + 0, ), }, ), @@ -516,7 +516,7 @@ expression: built.ir() sym: cycle_5, }, location_kind: Cluster( - 2, + 0, ), }, }, @@ -531,22 +531,22 @@ expression: built.ir() Tee { inner: : Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }), input: CrossSingleton( CrossSingleton( Enumerate( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 0, + 2, ), from_key: None, to_location: Cluster( - 2, + 0, ), to_key: None, serialize_pipeline: Some( @@ -554,7 +554,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > (& data) . unwrap () . into ()) }", ], }, ), @@ -565,17 +565,17 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > (& b) . unwrap ()) }", ], }, ), ), input: CycleSource { ident: Ident { - sym: cycle_0, + sym: cycle_1, }, location_kind: Cluster( - 0, + 2, ), }, }, @@ -614,33 +614,33 @@ expression: built.ir() sym: cycle_6, }, location_kind: Cluster( - 2, + 0, ), input: DeferTick( Difference( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { inner: : FoldKeyed { - init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { inner: : Union( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 3, + 1, ), from_key: None, to_location: Cluster( - 2, + 0, ), to_key: None, serialize_pipeline: Some( @@ -648,7 +648,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& data) . unwrap () . into ()) }", ], }, ), @@ -659,24 +659,24 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }), input: CrossSingleton( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 2, + 0, ), from_key: None, to_location: Cluster( - 3, + 1, ), to_key: None, serialize_pipeline: Some( @@ -684,7 +684,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& data) . unwrap () . into ()) }", ], }, ), @@ -695,23 +695,23 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }), input: CrossSingleton( Tee { inner: , @@ -722,7 +722,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }), input: CrossSingleton( Difference( FlatMap { @@ -732,7 +732,7 @@ expression: built.ir() }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: , }, @@ -803,7 +803,7 @@ expression: built.ir() sym: cycle_7, }, location_kind: Cluster( - 2, + 0, ), }, ), @@ -816,7 +816,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { inner: , }, @@ -830,14 +830,14 @@ expression: built.ir() sym: cycle_7, }, location_kind: Cluster( - 2, + 0, ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), input: AntiJoin( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { inner: , }, @@ -854,15 +854,15 @@ expression: built.ir() sym: cycle_0, }, location_kind: Cluster( - 3, + 1, ), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), input: Persist( Union( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some (CheckpointOrP2a :: P2a (p2a)) } else { None } }), input: CrossSingleton( Tee { inner: , @@ -873,15 +873,15 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), input: Delta( Union( Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), input: CrossSingleton( Tee { inner: : ReduceKeyed { @@ -889,11 +889,11 @@ expression: built.ir() input: Persist( Network { from_location: Cluster( - 1, + 3, ), from_key: None, to_location: Cluster( - 3, + 1, ), to_key: None, serialize_pipeline: Some( @@ -912,19 +912,19 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: CycleSource { ident: Ident { sym: cycle_0, }, location_kind: Cluster( - 1, + 3, ), }, }, @@ -938,7 +938,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: , }, @@ -955,7 +955,7 @@ expression: built.ir() { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] }, ), location_kind: Cluster( - 3, + 1, ), }, ), @@ -971,7 +971,7 @@ expression: built.ir() sym: cycle_0, }, location_kind: Cluster( - 2, + 0, ), input: Tee { inner: , @@ -982,27 +982,27 @@ expression: built.ir() sym: cycle_1, }, location_kind: Cluster( - 1, + 3, ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { inner: : Sort( Union( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( - 2, + 0, ), from_key: None, to_location: Cluster( - 1, + 3, ), to_key: None, serialize_pipeline: Some( @@ -1010,7 +1010,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& data) . unwrap () . into ()) }", ], }, ), @@ -1021,15 +1021,15 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& b) . unwrap ()) }", ], }, ), ), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (slot , data) | ReplicaPayload { seq : slot , key : data . key , value : data . value } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), input: AntiJoin( Tee { inner: , @@ -1039,7 +1039,7 @@ expression: built.ir() sym: cycle_6, }, location_kind: Cluster( - 2, + 0, ), }, ), @@ -1053,7 +1053,7 @@ expression: built.ir() sym: cycle_1, }, location_kind: Cluster( - 1, + 3, ), }, ), @@ -1061,8 +1061,8 @@ expression: built.ir() }, Tee { inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | - 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), input: CrossSingleton( Tee { inner: , @@ -1073,15 +1073,15 @@ expression: built.ir() sym: cycle_2, }, location_kind: Cluster( - 1, + 3, ), }, Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_bench :: * ; - 1 } ; [e] }, + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_kv :: * ; - 1 } ; [e] }, ), location_kind: Cluster( - 1, + 3, ), }, ), @@ -1098,21 +1098,21 @@ expression: built.ir() sym: cycle_2, }, location_kind: Cluster( - 1, + 3, ), input: DeferTick( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . kv . key , payload . kv . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), input: Persist( Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { inner: , @@ -1135,12 +1135,12 @@ expression: built.ir() sym: cycle_3, }, location_kind: Cluster( - 1, + 3, ), input: DeferTick( Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Union( Reduce { @@ -1151,7 +1151,7 @@ expression: built.ir() sym: cycle_3, }, location_kind: Cluster( - 1, + 3, ), }, ), @@ -1159,10 +1159,10 @@ expression: built.ir() Persist( Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_bench :: * ; - 1 } ; [e] }, + { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_kv :: * ; - 1 } ; [e] }, ), location_kind: Cluster( - 1, + 3, ), }, ), @@ -1180,7 +1180,7 @@ expression: built.ir() sym: cycle_0, }, location_kind: Cluster( - 1, + 3, ), input: Tee { inner: , @@ -1188,24 +1188,75 @@ expression: built.ir() }, CycleSink { ident: Ident { - sym: cycle_1, + sym: cycle_0, }, location_kind: Cluster( - 0, + 2, + ), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ()) , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (leader_id , _) | leader_id }), + input: Network { + from_location: Cluster( + 0, + ), + from_key: None, + to_location: Cluster( + 2, + ), + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ()) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < () > (& data) . unwrap () . into ()) }", + ], + }, + ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < () > (& b) . unwrap ()) }", + ], + }, + ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use crate :: __staged :: cluster :: paxos :: * ; move | _ | () }), + input: Tee { + inner: , + }, + }, + }, + }, + }, + }, + CycleSink { + ident: Ident { + sym: cycle_2, + }, + location_kind: Cluster( + 2, ), input: DeferTick( AntiJoin( Tee { inner: : Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . kv . key , sender) }), input: Network { from_location: Cluster( - 1, + 3, ), from_key: None, to_location: Cluster( - 0, + 2, ), to_key: None, serialize_pipeline: Some( @@ -1213,7 +1264,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& data) . unwrap () . into ()) }", ], }, ), @@ -1224,13 +1275,13 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . kv . value . parse :: < u32 > () . unwrap ()) , payload) }), input: Tee { inner: , }, @@ -1239,10 +1290,10 @@ expression: built.ir() }, CycleSource { ident: Ident { - sym: cycle_1, + sym: cycle_2, }, location_kind: Cluster( - 0, + 2, ), }, ), @@ -1252,7 +1303,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { inner: , }, @@ -1264,10 +1315,56 @@ expression: built.ir() }, CycleSink { ident: Ident { - sym: cycle_2, + sym: cycle_1, }, location_kind: Cluster( - 0, + 2, + ), + input: Union( + FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), + input: Tee { + inner: : Delta( + Tee { + inner: : Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Persist( + Inspect { + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, + }, + location_kind: Cluster( + 2, + ), + }, + }, + ), + }, + }, + ), + }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , cur_leader) | (cur_leader , KvPayload { key , value : c_id . raw_id . to_string () }) }), + input: CrossSingleton( + Tee { + inner: , + }, + Tee { + inner: , + }, + ), + }, + ), + }, + CycleSink { + ident: Ident { + sym: cycle_3, + }, + location_kind: Cluster( + 2, ), input: DeferTick( ReduceKeyed { @@ -1275,12 +1372,12 @@ expression: built.ir() input: Union( Union( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { - sym: cycle_2, + sym: cycle_3, }, location_kind: Cluster( - 0, + 2, ), }, }, @@ -1289,61 +1386,7 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: : Delta( - Tee { - inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Persist( - Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | ballot | println ! ("Client notified that leader was elected: {:?}" , ballot) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , ()) , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (leader_id , _) | leader_id }), - input: Network { - from_location: Cluster( - 2, - ), - from_key: None, - to_location: Cluster( - 0, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , ()) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < () > (& data) . unwrap () . into ()) }", - ], - }, - ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < () > (& b) . unwrap ()) }", - ], - }, - ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use crate :: __staged :: cluster :: paxos :: * ; move | _ | () }), - input: Tee { - inner: , - }, - }, - }, - }, - }, - }, - ), - }, - }, - ), + inner: , }, }, }, @@ -1379,7 +1422,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { inner: , @@ -1388,14 +1431,14 @@ expression: built.ir() }, DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { inner: : Source { - source: Interval( - { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) }, + source: Stream( + { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, ), location_kind: Cluster( - 0, + 2, ), }, }, @@ -1429,7 +1472,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: , }, @@ -1441,7 +1484,7 @@ expression: built.ir() }, DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { inner: , }, @@ -1452,7 +1495,7 @@ expression: built.ir() }, ), Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { inner: , }, @@ -1460,31 +1503,4 @@ expression: built.ir() ), }, }, - CycleSink { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 0, - ), - input: Union( - FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }), - input: Tee { - inner: , - }, - }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot , ClientPayload { key , value : c_id . raw_id . to_string () }) }), - input: CrossSingleton( - Tee { - inner: , - }, - Tee { - inner: , - }, - ), - }, - ), - }, ]