diff --git a/hydroflow_plus/src/lib.rs b/hydroflow_plus/src/lib.rs index 9bd5b086d87..b0ca79be28c 100644 --- a/hydroflow_plus/src/lib.rs +++ b/hydroflow_plus/src/lib.rs @@ -50,8 +50,22 @@ pub struct RuntimeContext<'a> { _phantom: PhantomData<&'a mut &'a ()>, } +impl RuntimeContext<'_> { + pub fn new() -> Self { + Self { + _phantom: PhantomData, + } + } +} + impl Copy for RuntimeContext<'_> {} +impl Default for RuntimeContext<'_> { + fn default() -> Self { + Self::new() + } +} + impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> { fn to_tokens(self) -> (Option, Option) { (None, Some(quote!(&context))) @@ -68,6 +82,10 @@ impl HfCompiled<'_, ID> { pub fn hydroflow_ir(&self) -> &BTreeMap { &self.hydroflow_ir } + + pub fn take_ir(self) -> BTreeMap { + self.hydroflow_ir + } } impl<'a> HfCompiled<'a, usize> { diff --git a/hydroflow_plus/src/location.rs b/hydroflow_plus/src/location.rs index 3036a0b74ec..3e0a88f7047 100644 --- a/hydroflow_plus/src/location.rs +++ b/hydroflow_plus/src/location.rs @@ -24,11 +24,23 @@ pub enum LocationId { ExternalProcess(usize), } +impl LocationId { + pub fn raw_id(&self) -> usize { + match self { + LocationId::Process(id) => *id, + LocationId::Cluster(id) => *id, + LocationId::ExternalProcess(id) => *id, + } + } +} + pub trait Location<'a> { fn id(&self) -> LocationId; fn flow_state(&self) -> &FlowState; + fn make_from(id: LocationId, flow_state: FlowState) -> Self; + fn spin(&self) -> Stream<(), Unbounded, Self> where Self: Sized + NoTick, @@ -346,6 +358,17 @@ impl<'a, P> Location<'a> for ExternalProcess<'a, P> { fn flow_state(&self) -> &FlowState { &self.flow_state } + + fn make_from(id: LocationId, flow_state: FlowState) -> Self { + match id { + LocationId::ExternalProcess(id) => ExternalProcess { + id, + flow_state, + _phantom: PhantomData, + }, + _ => panic!(), + } + } } impl<'a, P> ExternalProcess<'a, P> { @@ -447,6 +470,17 @@ impl<'a, P> Location<'a> for Process<'a, P> { fn flow_state(&self) -> &FlowState { &self.flow_state } + + fn make_from(id: LocationId, flow_state: FlowState) -> Self { + match id { + LocationId::Process(id) => Process { + id, + flow_state, + _phantom: PhantomData, + }, + _ => panic!(), + } + } } #[repr(transparent)] @@ -581,6 +615,17 @@ impl<'a, C> Location<'a> for Cluster<'a, C> { fn flow_state(&self) -> &FlowState { &self.flow_state } + + fn make_from(id: LocationId, flow_state: FlowState) -> Self { + match id { + LocationId::Cluster(id) => Cluster { + id, + flow_state, + _phantom: PhantomData, + }, + _ => panic!(), + } + } } pub trait CanSend<'a, To: Location<'a>>: Location<'a> { diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index d9db34a831c..855e5860cad 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -260,6 +260,15 @@ impl<'a, T, W, N: Location<'a>> Singleton { } impl<'a, T, N: Location<'a>> Singleton> { + // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream + pub fn into_stream(self) -> Stream> { + Stream::new( + self.location_kind, + self.flow_state, + self.ir_node.into_inner(), + ) + } + pub fn cross_singleton(self, other: Other) -> >::Out where Self: CrossResult<'a, Other>, @@ -641,6 +650,16 @@ impl<'a, T, N: Location<'a>> Optional> { ), ) } + + pub fn into_singleton(self) -> Singleton, Bounded, Tick> + where + T: Clone, + N: NoTick, + { + let self_location = N::make_from(self.location_kind, self.flow_state.clone()); + self.map(q!(|v| Some(v))) + .unwrap_or(self_location.singleton_each_tick(q!(None))) + } } impl<'a, T, N: Location<'a>> Optional> { @@ -730,6 +749,15 @@ impl<'a, T, B, N: Location<'a> + NoTick> Optional { self.latest_tick().unwrap_or(other.latest_tick()).latest() } + + pub fn into_singleton(self) -> Singleton, Unbounded, N> + where + T: Clone, + { + let self_location = N::make_from(self.location_kind, self.flow_state.clone()); + self.map(q!(|v| Some(v))) + .unwrap_or(self_location.singleton(q!(None))) + } } impl<'a, T, N: Location<'a> + NoTick> Optional { diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index 55545d46cce..39f664d889d 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -46,6 +46,12 @@ impl<'a, L: Location<'a>> Location<'a> for Tick { fn flow_state(&self) -> &FlowState { self.l.flow_state() } + + fn make_from(id: LocationId, flow_state: FlowState) -> Self { + Tick { + l: L::make_from(id, flow_state), + } + } } /// An infinite stream of elements of type `T`. diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index b75a3957c97..32a5074cc8f 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -42,7 +42,7 @@ struct P1b { #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] struct P2a

{ ballot: Ballot, - slot: i32, + slot: usize, value: Option

, // might be a re-committed hole } @@ -50,7 +50,7 @@ struct P2a

{ struct P2b

{ ballot: Ballot, max_ballot: Ballot, - slot: i32, + slot: usize, value: Option

, // might be a hole } @@ -62,7 +62,7 @@ struct P2b

{ pub fn paxos_core<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, - r_to_acceptors_checkpoint: Stream<(ClusterId, i32), Unbounded, Cluster<'a, Acceptor>>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, c_to_proposers: Stream>, f: usize, i_am_leader_send_timeout: u64, @@ -70,7 +70,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( i_am_leader_check_timeout_delay_multiplier: usize, ) -> ( Stream<(), Unbounded, Cluster<'a, Proposer>>, - Stream<(i32, Option

), Unbounded, Cluster<'a, Proposer>>, + Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>>, ) { proposers .source_iter(q!(["Proposers say hello"])) @@ -83,7 +83,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>( let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) = proposers.forward_ref::, _, _>>(); let (a_log_complete_cycle, a_log_forward_reference) = - acceptors.tick_forward_ref::>), _, _>>(); + acceptors + .tick_forward_ref::, HashMap>), _, _>>(); let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election( proposers, @@ -437,12 +438,12 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>( #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")] fn recommit_after_leader_election<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - p_relevant_p1bs: Stream>>, Bounded, Tick>>, + p_relevant_p1bs: Stream>>, Bounded, Tick>>, p_ballot_num: Singleton>>, f: usize, ) -> ( Stream, Bounded, Tick>>, - Optional>>, + Optional>>, Stream, Bounded, Tick>>, ) { let p_id = proposers.self_id(); @@ -520,19 +521,19 @@ fn sequence_payload<'a, P: PaxosPayload, R>( proposers: &Cluster<'a, Proposer>, acceptors: &Cluster<'a, Acceptor>, c_to_proposers: Stream>, - r_to_acceptors_checkpoint: Stream<(ClusterId, i32), Unbounded, Cluster<'a, Acceptor>>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, p_ballot_num: Singleton>>, p_is_leader: Optional>>, - p_max_slot: Optional>>, + p_max_slot: Optional>>, p_log_to_recommit: Stream, Bounded, Tick>>, f: usize, a_max_ballot: Singleton>>, ) -> ( - Stream<(i32, Option

), Unbounded, Cluster<'a, Proposer>>, - Singleton<(i32, HashMap>), Bounded, Tick>>, + Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>>, + Singleton<(Option, HashMap>), Bounded, Tick>>, Stream, Unbounded, Cluster<'a, Proposer>>, ) { let p_to_acceptors_p2a = p_p2a( @@ -552,7 +553,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>( p_to_acceptors_p2a, r_to_acceptors_checkpoint, proposers, - acceptors, f, ); @@ -563,14 +563,14 @@ fn sequence_payload<'a, P: PaxosPayload, R>( #[derive(Clone)] enum CheckpointOrP2a

{ - Checkpoint(i32), + Checkpoint(usize), P2a(P2a

), } // Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors. fn p_p2a<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, - p_max_slot: Optional>>, + p_max_slot: Optional>>, c_to_proposers: Stream>, p_ballot_num: Singleton>>, p_log_to_recommit: Stream, Bounded, Tick>>, @@ -578,7 +578,7 @@ fn p_p2a<'a, P: PaxosPayload>( acceptors: &Cluster<'a, Acceptor>, ) -> Stream, Unbounded, Cluster<'a, Acceptor>> { let p_id = proposers.self_id(); - let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::>(); + let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::>(); let p_next_slot_after_reconciling_p1bs = p_max_slot .map(q!(|max_slot| max_slot + 1)) .unwrap_or(proposers.singleton_each_tick(q!(0))) @@ -595,7 +595,7 @@ fn p_p2a<'a, P: PaxosPayload>( // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, - slot: next_slot + index as i32, + slot: next_slot + index, value: Some(payload) })); // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); @@ -609,9 +609,7 @@ fn p_p2a<'a, P: PaxosPayload>( let p_next_slot_after_sending_payloads = p_num_payloads .clone() .cross_singleton(p_next_slot.clone()) - .map(q!( - |(num_payloads, next_slot)| next_slot + num_payloads as i32 - )); + .map(q!(|(num_payloads, next_slot)| next_slot + num_payloads)); let p_new_next_slot = p_next_slot_after_reconciling_p1bs // .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot))) @@ -627,12 +625,11 @@ fn p_p2a<'a, P: PaxosPayload>( fn acceptor_p2<'a, P: PaxosPayload, R>( a_max_ballot: Singleton>>, p_to_acceptors_p2a: Stream, Unbounded, Cluster<'a, Acceptor>>, - r_to_acceptors_checkpoint: Stream<(ClusterId, i32), Unbounded, Cluster<'a, Acceptor>>, + r_to_acceptors_checkpoint: Stream<(ClusterId, usize), Unbounded, Cluster<'a, Acceptor>>, proposers: &Cluster<'a, Proposer>, - acceptors: &Cluster<'a, Acceptor>, f: usize, ) -> ( - Singleton<(i32, HashMap>), Bounded, Tick>>, + Singleton<(Option, HashMap>), Bounded, Tick>>, Stream, Unbounded, Cluster<'a, Proposer>>, ) { let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch(); @@ -658,7 +655,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( .continue_if(a_checkpoints_quorum_reached) .map(q!(|(_sender, seq)| seq)) .min() - .unwrap_or(acceptors.singleton_each_tick(q!(-1))) .delta() .map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq))); // .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq))); @@ -677,19 +673,19 @@ fn acceptor_p2<'a, P: PaxosPayload, R>( .union(a_new_checkpoint.into_stream()) .persist() .fold( - q!(|| (-1, HashMap::new())), + q!(|| (None, HashMap::new())), q!(|(prev_checkpoint, log), checkpoint_or_p2a| { match checkpoint_or_p2a { CheckpointOrP2a::Checkpoint(new_checkpoint) => { // This is a checkpoint message. Delete all entries up to the checkpoint - for slot in *prev_checkpoint..new_checkpoint { + for slot in (prev_checkpoint.unwrap_or(0))..new_checkpoint { log.remove(&slot); } - *prev_checkpoint = new_checkpoint; + *prev_checkpoint = Some(new_checkpoint); } CheckpointOrP2a::P2a(p2a) => { // This is a regular p2a message. Insert it into the log if it is not checkpointed and has a higher ballot than what was there before - if p2a.slot > *prev_checkpoint + if prev_checkpoint.map(|prev| p2a.slot > prev).unwrap_or(true) && log .get(&p2a.slot) .map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot) @@ -728,7 +724,7 @@ fn p_p2b<'a, P: PaxosPayload>( proposers: &Cluster<'a, Proposer>, a_to_proposers_p2b: Stream, Unbounded, Cluster<'a, Proposer>>, f: usize, -) -> Stream<(i32, Option

), Unbounded, Cluster<'a, Proposer>> { +) -> Stream<(usize, Option

), Unbounded, Cluster<'a, Proposer>> { let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle(); let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle(); let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs); diff --git a/hydroflow_plus_test/src/cluster/paxos_kv.rs b/hydroflow_plus_test/src/cluster/paxos_kv.rs index 55282bd5b50..8cdd3c7db5d 100644 --- a/hydroflow_plus_test/src/cluster/paxos_kv.rs +++ b/hydroflow_plus_test/src/cluster/paxos_kv.rs @@ -26,7 +26,7 @@ pub struct KvPayload { #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct SequencedKv { // Note: Important that seq is the first member of the struct for sorting - pub seq: i32, + pub seq: usize, pub kv: Option>, } @@ -95,39 +95,36 @@ pub fn replica<'a, K: KvKey, V: KvValue>( p_to_replicas: Stream, Unbounded, Cluster<'a, Replica>>, checkpoint_frequency: usize, ) -> ( - Stream>, + Stream>, Stream, Unbounded, Cluster<'a, Replica>>, ) { let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle(); // p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload))); let r_sorted_payloads = p_to_replicas - .clone() .tick_batch() .union(r_buffered_payloads) // Combine with all payloads that we've received and not processed yet .sort(); // Create a cycle since we'll use this seq before we define it let (r_highest_seq_complete_cycle, r_highest_seq) = - replicas.tick_cycle::>(); - let empty_slot = replicas.singleton_first_tick(q!(-1)); - // Either the max sequence number executed so far or -1. Need to union otherwise r_highest_seq is empty and joins with it will fail - let r_highest_seq_with_default = r_highest_seq.union(empty_slot); + replicas.tick_cycle::>(); // Find highest the sequence number of any payload that can be processed in this tick. This is the payload right before a hole. let r_highest_seq_processable_payload = r_sorted_payloads .clone() - .cross_singleton(r_highest_seq_with_default) + .cross_singleton(r_highest_seq.into_singleton()) .fold( - q!(|| -1), + q!(|| None), q!(|filled_slot, (sorted_payload, highest_seq)| { - // Note: This function only works if the input is sorted on seq. - let next_slot = std::cmp::max(*filled_slot, highest_seq); - - *filled_slot = if sorted_payload.seq == next_slot + 1 { - sorted_payload.seq - } else { - *filled_slot - }; + let expected_next_slot = std::cmp::max( + filled_slot.map(|v| v + 1).unwrap_or(0), + highest_seq.map(|v| v + 1).unwrap_or(0), + ); + + if sorted_payload.seq == expected_next_slot { + *filled_slot = Some(sorted_payload.seq); + } }), - ); + ) + .filter_map(q!(|v| v)); // Find all payloads that can and cannot be processed in this tick. let r_processable_payloads = r_sorted_payloads .clone() @@ -149,30 +146,28 @@ pub fn replica<'a, K: KvKey, V: KvValue>( let r_kv_store = r_processable_payloads .clone() .persist() // Optimization: all_ticks() + fold() = fold, where the state of the previous fold is saved and persisted values are deleted. - .fold(q!(|| (HashMap::new(), -1)), q!(|(kv_store, last_seq), payload| { + .fold(q!(|| (HashMap::new(), None)), q!(|(kv_store, last_seq), payload| { if let Some(kv) = payload.kv { kv_store.insert(kv.key, kv.value); } - debug_assert!(payload.seq == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.seq); - *last_seq = payload.seq; - // println!("Replica kv store: {:?}", kv_store); + + debug_assert!(payload.seq == (last_seq.map(|s| s + 1).unwrap_or(0)), "Hole in log between seq {:?} and {}", *last_seq, payload.seq); + *last_seq = Some(payload.seq); })); // Update the highest seq for the next tick - let r_new_highest_seq = r_kv_store.map(q!(|(_kv_store, highest_seq)| highest_seq)); - r_highest_seq_complete_cycle.complete_next_tick(r_new_highest_seq.clone().into()); + let r_new_highest_seq = r_kv_store.filter_map(q!(|(_kv_store, highest_seq)| highest_seq)); + r_highest_seq_complete_cycle.complete_next_tick(r_new_highest_seq.clone()); // Send checkpoints to the acceptors when we've processed enough payloads let (r_checkpointed_seqs_complete_cycle, r_checkpointed_seqs) = - replicas.tick_cycle::>(); - let r_max_checkpointed_seq = r_checkpointed_seqs - .persist() - .max() - .unwrap_or(replicas.singleton(q!(-1)).latest_tick()); + replicas.tick_cycle::>(); + let r_max_checkpointed_seq = r_checkpointed_seqs.persist().max().into_singleton(); let r_checkpoint_seq_new = r_max_checkpointed_seq .cross_singleton(r_new_highest_seq) .filter_map(q!( - move |(max_checkpointed_seq, new_highest_seq)| if new_highest_seq - max_checkpointed_seq - >= checkpoint_frequency as i32 + move |(max_checkpointed_seq, new_highest_seq)| if max_checkpointed_seq + .map(|m| new_highest_seq - m >= checkpoint_frequency) + .unwrap_or(true) { Some(new_highest_seq) } else { @@ -182,8 +177,8 @@ pub fn replica<'a, K: KvKey, V: KvValue>( r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone()); // Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value. - ( - r_checkpoint_seq_new.all_ticks(), - p_to_replicas.filter_map(q!(|t| t.kv)), - ) + let r_to_clients = r_processable_payloads + .filter_map(q!(|payload| payload.kv)) + .all_ticks(); + (r_checkpoint_seq_new.all_ticks(), r_to_clients) } diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap index 711533647bc..2a93b6e9c6f 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap @@ -44,7 +44,7 @@ expression: built.ir() Union( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . max_ballot }), input: CycleSource { ident: Ident { sym: cycle_1, @@ -211,7 +211,7 @@ expression: built.ir() ), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), input: Network { from_location: Cluster( 1, @@ -226,7 +226,7 @@ expression: built.ir() Operator { path: "map", args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& data) . unwrap () . into ()) }", + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& data) . unwrap () . into ()) }", ], }, ), @@ -237,13 +237,13 @@ expression: built.ir() Operator { path: "map", args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& b) . unwrap ()) }", + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > (& b) . unwrap ()) }", ], }, ), ), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , log) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }), input: CrossSingleton( CrossSingleton( Tee { @@ -378,7 +378,7 @@ expression: built.ir() }, ), Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ckpnt , log) | log . clone () }), input: CycleSource { ident: Ident { sym: cycle_0, @@ -409,16 +409,16 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }), input: CrossSingleton( Persist( Inspect { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }), input: Tee { inner: , }, @@ -462,26 +462,26 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }), input: CrossSingleton( Union( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | max_slot + 1 }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | max_slot + 1 }), input: Tee { inner: : Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: : FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { if let Some (curr_entry_payload) = & mut curr_entry . 1 { let same_values = new_entry . value == curr_entry_payload . value ; let higher_ballot = new_entry . ballot > curr_entry_payload . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry_payload . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry_payload . value = new_entry . value ; } } } else { * curr_entry = (1 , Some (new_entry)) ; } } }), input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > , std :: collections :: hash_map :: IntoIter < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }), input: Tee { inner: , }, @@ -509,7 +509,7 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }), input: Fold { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : CycleSource { ident: Ident { @@ -526,7 +526,7 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads }), input: CrossSingleton( Tee { inner: : Fold { @@ -534,7 +534,7 @@ expression: built.ir() acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : Some (payload) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , usize) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index , value : Some (payload) } }), input: CrossSingleton( CrossSingleton( Enumerate( @@ -619,18 +619,18 @@ expression: built.ir() input: DeferTick( Difference( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }), input: Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }), input: Tee { inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (k , (count , v)) | (k , (count , v . unwrap ())) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (k , (count , v)) | (k , (count , v . unwrap ())) }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , None) }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , core :: option :: Option < core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = Some (value) ; } }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , core :: option :: Option < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . ballot == p2b . max_ballot { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }), input: Tee { inner: : Union( Tee { @@ -713,7 +713,7 @@ expression: built.ir() input: CrossSingleton( Union( FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | ((slot , (count , entry)) , ballot_num) | { let entry = entry . unwrap () ; if count <= f { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } } }), input: CrossSingleton( Tee { inner: , @@ -724,17 +724,17 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : None } }), input: CrossSingleton( Difference( FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: ops :: Range < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }), input: Tee { inner: , }, }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >)) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }), input: Tee { inner: , }, @@ -819,7 +819,7 @@ expression: built.ir() }, Tee { inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((usize , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >)) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }), input: Tee { inner: , }, @@ -837,10 +837,10 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }), input: AntiJoin( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }), input: Tee { inner: , }, @@ -860,8 +860,8 @@ expression: built.ir() 1, ), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } CheckpointOrP2a :: P2a (p2a) => { if p2a . slot > * prev_checkpoint && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (None , HashMap :: new ()) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > >) , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , checkpoint_or_p2a | { match checkpoint_or_p2a { CheckpointOrP2a :: Checkpoint (new_checkpoint) => { for slot in (prev_checkpoint . unwrap_or (0)) .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = Some (new_checkpoint) ; } CheckpointOrP2a :: P2a (p2a) => { if prev_checkpoint . map (| prev | p2a . slot > prev) . unwrap_or (true) && log . get (& p2a . slot) . map (| prev_p2a : & LogValue < _ > | p2a . ballot > prev_p2a . ballot) . unwrap_or (true) { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } } }), input: Persist( Union( FilterMap { @@ -876,93 +876,81 @@ expression: built.ir() ), }, Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: CheckpointOrP2a < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | CheckpointOrP2a :: Checkpoint (min_seq) }), input: Delta( - Union( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , usize > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), - input: CrossSingleton( - Tee { - inner: : ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), - input: Persist( - Network { - from_location: Cluster( - 3, - ), - from_key: None, - to_location: Cluster( - 1, - ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }", - ], - }, - ), + f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }), + input: CrossSingleton( + Tee { + inner: : ReduceKeyed { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }), + input: Persist( + Network { + from_location: Cluster( + 3, + ), + from_key: None, + to_location: Cluster( + 1, + ), + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , usize) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < usize > (& data) . unwrap () . into ()) }", + ], + }, ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }", - ], - }, - ), + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < usize > (& b) . unwrap ()) }", + ], + }, ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: CycleSource { - ident: Ident { - sym: cycle_0, - }, - location_kind: Cluster( - 3, - ), + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_1) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: CycleSource { + ident: Ident { + sym: cycle_0, }, + location_kind: Cluster( + 3, + ), }, }, - ), - }, + }, + ), }, - Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), - input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }), - input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), - input: Tee { - inner: , - }, + }, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }), + input: FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , usize) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), + input: Tee { + inner: , }, }, }, - ), - }, - }, - }, - Persist( - Source { - source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] }, - ), - location_kind: Cluster( - 1, + }, ), }, - ), - ), + }, + }, ), }, ), @@ -989,64 +977,62 @@ expression: built.ir() ), input: DeferTick( Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }), input: CrossSingleton( Tee { inner: : Sort( Union( - Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), - input: Network { - from_location: Cluster( - 0, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }), + input: Network { + from_location: Cluster( + 0, + ), + from_key: None, + to_location: Cluster( + 3, + ), + to_key: None, + serialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", + ], + }, ), - from_key: None, - to_location: Cluster( - 3, + ), + instantiate_fn: , + deserialize_pipeline: Some( + Operator( + Operator { + path: "map", + args: [ + "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", + ], + }, ), - to_key: None, - serialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& data) . unwrap () . into ()) }", - ], + ), + input: FlatMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), + input: Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), + input: AntiJoin( + Tee { + inner: , }, - ), - ), - instantiate_fn: , - deserialize_pipeline: Some( - Operator( - Operator { - path: "map", - args: [ - "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (& b) . unwrap ()) }", - ], + CycleSource { + ident: Ident { + sym: cycle_6, + }, + location_kind: Cluster( + 0, + ), }, ), - ), - input: FlatMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }), - input: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (slot , kv) | SequencedKv { seq : slot , kv } }), - input: AntiJoin( - Tee { - inner: , - }, - CycleSource { - ident: Ident { - sym: cycle_6, - }, - location_kind: Cluster( - 0, - ), - }, - ), - }, }, }, }, @@ -1063,32 +1049,40 @@ expression: built.ir() ), }, Tee { - inner: : Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | - 1 }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }), - input: CrossSingleton( - Tee { - inner: , - }, - Union( - CycleSource { - ident: Ident { - sym: cycle_2, - }, - location_kind: Cluster( - 3, - ), + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | v | v }), + input: Fold { + init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | None }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | filled_slot , (sorted_payload , highest_seq) | { let expected_next_slot = std :: cmp :: max (filled_slot . map (| v | v + 1) . unwrap_or (0) , highest_seq . map (| v | v + 1) . unwrap_or (0) ,) ; if sorted_payload . seq == expected_next_slot { * filled_slot = Some (sorted_payload . seq) ; } } }), + input: CrossSingleton( + Tee { + inner: , }, - Source { - source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_kv :: * ; - 1 } ; [e] }, - ), - location_kind: Cluster( - 3, + Union( + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | v | Some (v) }), + input: CycleSource { + ident: Ident { + sym: cycle_2, + }, + location_kind: Cluster( + 3, + ), + }, + }, + Persist( + Source { + source: Iter( + { use hydroflow_plus :: __staged :: location :: * ; let e = { use hydroflow_plus :: __staged :: singleton :: * ; None } ; [e] }, + ), + location_kind: Cluster( + 3, + ), + }, ), - }, + ), ), - ), + }, }, }, ), @@ -1105,23 +1099,23 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (_kv_store , highest_seq) | highest_seq }), input: Fold { - init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , - 1) }), - acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }), + init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | | (HashMap :: new () , None) }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < usize >) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , () > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (kv_store , last_seq) , payload | { if let Some (kv) = payload . kv { kv_store . insert (kv . key , kv . value) ; } debug_assert ! (payload . seq == (last_seq . map (| s | s + 1) . unwrap_or (0)) , "Hole in log between seq {:?} and {}" , * last_seq , payload . seq) ; * last_seq = Some (payload . seq) ; } }), input: Persist( Tee { - inner: : Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), + inner: : Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , _) | { sorted_payload } }), input: Filter { - f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), + f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , usize) , bool > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }), input: CrossSingleton( Tee { inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1142,27 +1136,30 @@ expression: built.ir() ), input: DeferTick( Tee { - inner: : FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }), + inner: : FilterMap { + f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , usize) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if max_checkpointed_seq . map (| m | new_highest_seq - m >= checkpoint_frequency) . unwrap_or (true) { Some (new_highest_seq) } else { None } }), input: CrossSingleton( Union( - Reduce { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), - input: Persist( - CycleSource { - ident: Ident { - sym: cycle_3, + Map { + f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | v | Some (v) }), + input: Reduce { + f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), + input: Persist( + CycleSource { + ident: Ident { + sym: cycle_3, + }, + location_kind: Cluster( + 3, + ), }, - location_kind: Cluster( - 3, - ), - }, - ), + ), + }, }, Persist( Source { source: Iter( - { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos_kv :: * ; - 1 } ; [e] }, + { use hydroflow_plus :: __staged :: location :: * ; let e = { use hydroflow_plus :: __staged :: singleton :: * ; None } ; [e] }, ), location_kind: Cluster( 3, @@ -1171,7 +1168,7 @@ expression: built.ir() ), ), Tee { - inner: , + inner: , }, ), }, @@ -1186,7 +1183,7 @@ expression: built.ir() 3, ), input: Tee { - inner: , + inner: , }, }, CycleSink { @@ -1250,7 +1247,7 @@ expression: built.ir() input: DeferTick( AntiJoin( Tee { - inner: : Union( + inner: : Union( Map { f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }), input: Network { @@ -1286,9 +1283,9 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (payload . value , payload) }), input: FilterMap { - f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | t | t . kv }), + f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_kv :: SequencedKv < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > > ({ use crate :: __staged :: cluster :: paxos_kv :: * ; | payload | payload . kv }), input: Tee { - inner: , + inner: , }, }, }, @@ -1305,13 +1302,13 @@ expression: built.ir() ), }, Tee { - inner: : FilterMap { + inner: : FilterMap { f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }), input: FoldKeyed { init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_kv :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }), input: Tee { - inner: , + inner: , }, }, }, @@ -1330,9 +1327,9 @@ expression: built.ir() FlatMap { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot , KvPayload { key : i as u32 , value : c_id })) }), input: Tee { - inner: : Delta( + inner: : Delta( Tee { - inner: : Reduce { + inner: : Reduce { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }), input: Persist( Inspect { @@ -1356,10 +1353,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_kv :: KvPayload < u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (key , cur_leader) | (cur_leader , KvPayload { key , value : c_id }) }), input: CrossSingleton( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1378,7 +1375,7 @@ expression: built.ir() input: Union( Union( Tee { - inner: : CycleSource { + inner: : CycleSource { ident: Ident { sym: cycle_3, }, @@ -1392,16 +1389,16 @@ expression: built.ir() input: Map { f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }), input: Tee { - inner: , + inner: , }, }, }, ), Tee { - inner: : Map { + inner: : Map { f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }), input: Tee { - inner: , + inner: , }, }, }, @@ -1428,10 +1425,10 @@ expression: built.ir() f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }), input: Join( Tee { - inner: , + inner: , }, Tee { - inner: , + inner: , }, ), }, @@ -1439,7 +1436,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }), input: Tee { - inner: : Source { + inner: : Source { source: Stream( { use hydroflow_plus :: __staged :: location :: * ; let interval = { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval (interval)) }, ), @@ -1469,7 +1466,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, Map { @@ -1480,7 +1477,7 @@ expression: built.ir() init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }), acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }), input: Tee { - inner: , + inner: , }, }, }, @@ -1492,7 +1489,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }), input: Tee { - inner: , + inner: , }, }, ), @@ -1503,7 +1500,7 @@ expression: built.ir() Map { f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }), input: Tee { - inner: , + inner: , }, }, ),