diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs
index 411404226967..54374e6c3f14 100644
--- a/hydroflow_plus/src/singleton.rs
+++ b/hydroflow_plus/src/singleton.rs
@@ -580,6 +580,10 @@ impl<'a, T, N: Location<'a>> Optional<T, Bounded, Tick, N> {
             .map(q!(|(d, _signal)| d))
     }
 
+    pub fn then<U>(self, value: Singleton<U, Bounded, Tick, N>) -> Optional<U, Bounded, Tick, N> {
+        value.continue_if(self)
+    }
+
     pub fn continue_unless<U>(
         self,
         other: Optional<U, Bounded, Tick, N>,
diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs
index 488be19c01e4..0667dae4e9d4 100644
--- a/hydroflow_plus_test/src/cluster/paxos.rs
+++ b/hydroflow_plus_test/src/cluster/paxos.rs
@@ -21,12 +21,12 @@ pub trait PaxosPayload:
 pub struct Ballot {
     // Note: Important that num comes before id, since Ord is defined lexicographically
     pub num: u32,
-    pub id: ClusterId<Proposer>,
+    pub proposer_id: ClusterId<Proposer>,
 }
 
 impl LeaderElected for Ballot {
     fn leader_id(&self) -> ClusterId<Proposer> {
-        self.id
+        self.proposer_id
     }
 }
 
@@ -57,8 +57,8 @@ struct P2a<P> {
 
 #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
 struct P2b<P> {
+    victory: bool,
     ballot: Ballot,
-    max_ballot: Ballot,
     slot: i32,
     value: P,
 }
@@ -119,7 +119,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
 
     let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b(
         &proposers,
-        a_to_proposers_p1b.inspect(q!(|(_, p1b)| println!("Proposer received P1b: {:?}", p1b))),
+        a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
         p_ballot_num.clone(),
         p_has_largest_ballot,
         f,
@@ -152,7 +152,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
     let p_to_clients_new_leader_elected = p_is_leader.clone()
         .continue_unless(p_next_slot)
         .cross_singleton(p_ballot_num)
-        .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, id: p_id})) // Only tell the clients once when leader election concludes
+        .map(q!(move |(_is_leader, ballot_num)| Ballot { num: ballot_num, proposer_id: p_id})) // Only tell the clients once when leader election concludes
         .all_ticks();
     // End tell clients that leader election has completed
     let p_to_replicas = p_p2b(&proposers, a_to_proposers_p2b, f);
@@ -197,8 +197,8 @@ fn acceptor<'a, P: PaxosPayload, R>(
     acceptors: &Cluster<'a, Acceptor>,
     f: usize,
 ) -> (
-    Stream<(ClusterId<Acceptor>, P1b<P>), Unbounded, NoTick, Cluster<'a, Proposer>>,
-    Stream<(ClusterId<Acceptor>, P2b<P>), Unbounded, NoTick, Cluster<'a, Proposer>>,
+    Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
+    Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
 ) {
     // Get the latest checkpoint sequence per replica
     let a_checkpoint_largest_seqs =
@@ -229,7 +229,7 @@ fn acceptor<'a, P: PaxosPayload, R>(
                 // Create tuple with checkpoint number and dummy p2a
                 ballot: Ballot {
                     num: 0,
-                    id: ClusterId::from_raw(0)
+                    proposer_id: ClusterId::from_raw(0)
                 },
                 slot: -1,
                 value: Default::default()
@@ -243,7 +243,7 @@ fn acceptor<'a, P: PaxosPayload, R>(
         .max()
         .unwrap_or(acceptors.singleton(q!(Ballot {
             num: 0,
-            id: ClusterId::from_raw(0)
+            proposer_id: ClusterId::from_raw(0)
         })));
     let a_p2as_to_place_in_log = p_to_acceptors_p2a
         .clone()
@@ -303,7 +303,7 @@ fn acceptor<'a, P: PaxosPayload, R>(
         .cross_singleton(a_max_ballot.clone().latest_tick())
         .cross_singleton(a_log)
         .map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| (
-            p1a.ballot.id,
+            p1a.ballot.proposer_id,
             P1b {
                 ballot: p1a.ballot,
                 max_ballot,
@@ -311,33 +311,27 @@ fn acceptor<'a, P: PaxosPayload, R>(
             }
         )))
         .all_ticks()
-        .send_bincode(proposers);
+        .send_bincode_interleaved(proposers);
     let a_to_proposers_p2b_new = p_to_acceptors_p2a
         .tick_batch()
         .cross_singleton(a_max_ballot.latest_tick())
         .map(q!(|(p2a, max_ballot)| (
-            p2a.ballot.id,
+            p2a.ballot.proposer_id,
             P2b {
+                victory: p2a.ballot == max_ballot,
                 ballot: p2a.ballot,
-                max_ballot,
                 slot: p2a.slot,
                 value: p2a.value
             }
         )))
         .all_ticks()
-        .send_bincode(proposers);
+        .send_bincode_interleaved(proposers);
     (a_to_proposers_p1b_new, a_to_proposers_p2b_new)
 }
 
-#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
 fn p_p2b<'a, P: PaxosPayload>(
     proposers: &Cluster<'a, Proposer>,
-    a_to_proposers_p2b: Stream<
-        (ClusterId<Acceptor>, P2b<P>),
-        Unbounded,
-        NoTick,
-        Cluster<'a, Proposer>,
-    >,
+    a_to_proposers_p2b: Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
     f: usize,
 ) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
     let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
@@ -348,62 +342,55 @@ fn p_p2b<'a, P: PaxosPayload>(
         .union(p_persisted_p2bs);
     let p_count_matching_p2bs = p_p2b
         .clone()
-        .filter_map(q!(|(sender, p2b)| if p2b.ballot == p2b.max_ballot {
+        .filter_map(q!(|p2b| if p2b.victory {
             // Only consider p2bs where max ballot = ballot, which means that no one preempted us
-            Some((p2b.slot, (sender, p2b)))
+            Some(((p2b.slot, p2b.ballot), p2b.value))
         } else {
             None
         }))
         .fold_keyed(
-            q!(|| (
-                0,
-                P2b {
-                    ballot: Ballot {
-                        num: 0,
-                        id: ClusterId::from_raw(0)
-                    },
-                    max_ballot: Ballot {
-                        num: 0,
-                        id: ClusterId::from_raw(0)
-                    },
-                    slot: 0,
-                    value: Default::default()
-                }
-            )),
-            q!(|accum, (_sender, p2b)| {
+            q!(|| (0, Default::default())),
+            q!(|accum, value| {
+                // TODO(shadaj): why is sender unused? should we de-dup?
                 accum.0 += 1;
-                accum.1 = p2b;
+                accum.1 = value;
             }),
         );
-    let p_p2b_quorum_reached = p_count_matching_p2bs
-        .clone()
-        .filter(q!(move |(_slot, (count, _p2b))| *count > f));
+    let p_p2b_quorum_reached =
+        p_count_matching_p2bs
+            .clone()
+            .filter_map(q!(move |((slot, _ballot), (count, value))| if count > f {
+                Some((slot, value))
+            } else {
+                None
+            }));
     let p_to_replicas = p_p2b_quorum_reached
         .clone()
         .anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once
-        .map(q!(|(_slot, (_count, p2b))| (p2b.slot, p2b.value)))
+        .map(q!(|(slot, value)| (slot, value)))
         .all_ticks();
 
     let p_p2b_all_commit_slots =
-        p_count_matching_p2bs
-            .clone()
-            .filter_map(q!(move |(slot, (count, _p2b))| if count == 2 * f + 1 {
+        p_count_matching_p2bs.clone().filter_map(q!(
+            move |((slot, _ballot), (count, _p2b))| if count == 2 * f + 1 {
                 Some(slot)
             } else {
                 None
-            }));
+            }
+        ));
     // p_p2b_all_commit_slots.inspect(q!(|slot: i32| println!("Proposer slot all received: {:?}", slot)));
     let p_broadcasted_p2b_slots_new = p_p2b_quorum_reached
         .clone()
-        .map(q!(|(slot, (_count, _p2b))| slot))
+        .map(q!(|(slot, _value)| slot))
         .filter_not_in(p_p2b_all_commit_slots.clone());
     // p_broadcasted_p2b_slots_new.inspect(q!(|slot: i32| println!("Proposer slot broadcasted: {:?}", slot)));
     p_broadcasted_p2b_slots_complete_cycle.complete_next_tick(p_broadcasted_p2b_slots_new);
     let p_persisted_p2bs_new = p_p2b
         .clone()
-        .map(q!(|(sender, p2b)| (p2b.slot, (sender, p2b))))
+        .map(q!(|p2b| (p2b.slot, p2b)))
         .anti_join(p_p2b_all_commit_slots.clone())
-        .map(q!(|(_slot, (sender, p2b))| (sender, p2b)));
+        .map(q!(|(_slot, p2b)| p2b));
+    // TOOD: only persist if we are the leader
     // p_persisted_p2bs_new.inspect(q!(|(sender, p2b): (u32, P2b)| println!("Proposer persisting p2b: {:?}", p2b)));
     p_persisted_p2bs_complete_cycle.complete_next_tick(p_persisted_p2bs_new);
     p_to_replicas
@@ -446,7 +433,7 @@ fn p_p2a<'a, P: PaxosPayload>(
         // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next))))
         .cross_singleton(p_ballot_num.clone())
         // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num))))
-        .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, id: p_id }, slot: next_slot + index as i32, value: payload }));
+        .map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
     // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
     let p_to_acceptors_p2a = p_log_to_try_commit
         .union(p_log_holes)
@@ -489,12 +476,7 @@ fn p_p2a<'a, P: PaxosPayload>(
 #[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
 fn p_p1b<'a, P: PaxosPayload>(
     proposers: &Cluster<'a, Proposer>,
-    a_to_proposers_p1b: Stream<
-        (ClusterId<Acceptor>, P1b<P>),
-        Unbounded,
-        NoTick,
-        Cluster<'a, Proposer>,
-    >,
+    a_to_proposers_p1b: Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
     p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
     p_has_largest_ballot: Optional<(Ballot, u32), Bounded, Tick, Cluster<'a, Proposer>>,
     f: usize,
@@ -506,30 +488,26 @@ fn p_p1b<'a, P: PaxosPayload>(
 ) {
     let p_id = proposers.self_id();
     let p_relevant_p1bs = a_to_proposers_p1b
-        .clone()
         .tick_prefix()
+        // NOTE: because `p_ballot_num` grows monotonically across ticks, we could garbage gollect
+        // but we don't do that here since leader election is a rare event
         .cross_singleton(p_ballot_num.clone())
-        .filter(q!(move |((_sender, p1b), ballot_num)| p1b.ballot
-            == Ballot {
-                num: *ballot_num,
-                id: p_id
+        .filter(q!(move |(p1b, ballot_num)| p1b.ballot.num == *ballot_num && p1b.ballot.proposer_id == p_id))
+        .map(q!(|t| t.0));
+    let p_received_quorum_of_p1bs =
+        p_relevant_p1bs
+            .clone()
+            .count()
+            .filter_map(q!(move |num_received| if num_received > f {
+                Some(true)
+            } else {
+                None
             }));
-    let p_received_quorum_of_p1bs = p_relevant_p1bs
-        .clone()
-        .map(q!(|((sender, _p1b), _ballot_num)| { sender }))
-        .unique()
-        .count()
-        .filter_map(q!(move |num_received| if num_received > f {
-            Some(true)
-        } else {
-            None
-        }));
     let p_is_leader = p_received_quorum_of_p1bs.continue_if(p_has_largest_ballot.clone());
 
     let p_p1b_highest_entries_and_count = p_relevant_p1bs
-        .clone()
-        .flat_map(q!(|((_, p1b), _)| p1b.accepted.into_iter())) // Convert HashMap log back to stream
-        .fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| {
+        .flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream
+        .fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| {
             let same_values = new_entry.value == curr_entry.1.value;
             let higher_ballot = new_entry.ballot > curr_entry.1.ballot;
             // Increment count if the values are the same
@@ -554,7 +532,7 @@ fn p_p1b<'a, P: PaxosPayload>(
                 Some(P2a {
                     ballot: Ballot {
                         num: ballot_num,
-                        id: p_id,
+                        proposer_id: p_id,
                     },
                     slot,
                     value: entry.value,
@@ -578,7 +556,7 @@ fn p_p1b<'a, P: PaxosPayload>(
         .map(q!(move |(slot, ballot_num)| P2a {
             ballot: Ballot {
                 num: ballot_num,
-                id: p_id
+                proposer_id: p_id
             },
             slot,
             value: Default::default()
@@ -587,36 +565,21 @@ fn p_p1b<'a, P: PaxosPayload>(
 }
 
 // Proposer logic to calculate the largest ballot received so far.
-#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
 fn p_max_ballot<'a, P: PaxosPayload>(
     proposers: &Cluster<'a, Proposer>,
-    a_to_proposers_p1b: Stream<
-        (ClusterId<Acceptor>, P1b<P>),
-        Unbounded,
-        NoTick,
-        Cluster<'a, Proposer>,
-    >,
-    a_to_proposers_p2b: Stream<
-        (ClusterId<Acceptor>, P2b<P>),
-        Unbounded,
-        NoTick,
-        Cluster<'a, Proposer>,
-    >,
+    a_to_proposers_p1b: Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
+    a_to_proposers_p2b: Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
     p_to_proposers_i_am_leader: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
 ) -> Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>> {
-    let p_received_p1b_ballots = a_to_proposers_p1b
-        .clone()
-        .map(q!(|(_, p1b)| p1b.max_ballot));
-    let p_received_p2b_ballots = a_to_proposers_p2b
-        .clone()
-        .map(q!(|(_, p2b)| p2b.max_ballot));
+    let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot));
+    let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot));
     p_received_p1b_ballots
         .union(p_received_p2b_ballots)
         .union(p_to_proposers_i_am_leader)
         .max()
         .unwrap_or(proposers.singleton(q!(Ballot {
             num: 0,
-            id: ClusterId::from_raw(0)
+            proposer_id: ClusterId::from_raw(0)
         })))
 }
 
@@ -640,7 +603,7 @@ fn p_ballot_calc<'a>(
             if received_max_ballot
                 > (Ballot {
                     num: ballot_num,
-                    id: p_id,
+                    proposer_id: p_id,
                 })
             {
                 received_max_ballot.num + 1
@@ -657,7 +620,7 @@ fn p_ballot_calc<'a>(
             move |(received_max_ballot, ballot_num)| *received_max_ballot
                 <= Ballot {
                     num: *ballot_num,
-                    id: p_id
+                    proposer_id: p_id
                 }
         ));
 
@@ -685,21 +648,55 @@ fn p_p1a<'a>(
     Stream<P1a, Unbounded, NoTick, Cluster<'a, Acceptor>>,
 ) {
     let p_id = proposers.self_id();
-    let p_to_proposers_i_am_leader_new = p_ballot_num
+    let p_to_proposers_i_am_leader_new = p_is_leader
         .clone()
+        .then(p_ballot_num.clone())
+        .latest()
+        .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
+        .map(q!(move |ballot_num| Ballot {
+            num: ballot_num,
+            proposer_id: p_id
+        }))
+        .broadcast_bincode_interleaved(proposers);
+
+    let p_leader_expired = p_leader_expired(
+        p_to_proposers_i_am_leader,
+        p_is_leader,
+        i_am_leader_check_timeout,
+    );
+
+    let p_id = proposers.self_id();
+
+    // Add random delay depending on node ID so not everyone sends p1a at the same time
+    let p_to_acceptors_p1a = p_leader_expired
+        .then(p_ballot_num)
         .continue_if(
             proposers
-                .source_interval(q!(Duration::from_secs(i_am_leader_send_timeout)))
+                .source_interval_delayed(
+                    q!(Duration::from_secs(
+                        (p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into()
+                    )),
+                    q!(Duration::from_secs(i_am_leader_check_timeout)),
+                )
                 .latest_tick(),
         )
-        .continue_if(p_is_leader.clone())
-        .map(q!(move |ballot_num| Ballot {
-            num: ballot_num,
-            id: p_id
+        .map(q!(move |ballot_num| P1a {
+            ballot: Ballot {
+                num: ballot_num,
+                proposer_id: p_id
+            }
         }))
         .all_ticks()
-        .broadcast_bincode_interleaved(proposers);
+        .inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
+        .broadcast_bincode_interleaved(acceptors);
+    (p_to_proposers_i_am_leader_new, p_to_acceptors_p1a)
+}
 
+fn p_leader_expired<'a>(
+    p_to_proposers_i_am_leader: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
+    p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
+    i_am_leader_check_timeout: u64, // How often to check if heartbeat expired
+) -> Optional<Option<Instant>, Bounded, Tick, Cluster<'a, Proposer>> {
     let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold(
         q!(|| None),
         q!(|latest, _| {
@@ -707,34 +704,17 @@ fn p_p1a<'a>(
             *latest = Some(Instant::now());
         }),
     );
-    // Add random delay depending on node ID so not everyone sends p1a at the same time
-    let p_leader_expired = proposers.source_interval_delayed(q!(Duration::from_secs((p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into())), q!(Duration::from_secs(i_am_leader_check_timeout)))
-        .cross_singleton(p_latest_received_i_am_leader.clone())
+
+    p_latest_received_i_am_leader
+        .clone()
         .latest_tick()
-        // .inspect(q!(|v| println!("Proposer checking if leader expired")))
-        // .continue_if(p_is_leader.clone().count().filter(q!(|c| *c == 0)).inspect(q!(|c| println!("Proposer is_leader count: {}", c))))
         .continue_unless(p_is_leader)
-        .filter(q!(move |(_, latest_received_i_am_leader)| {
+        .filter(q!(move |latest_received_i_am_leader| {
             if let Some(latest_received_i_am_leader) = latest_received_i_am_leader {
-                (Instant::now().duration_since(*latest_received_i_am_leader)) > Duration::from_secs(i_am_leader_check_timeout)
+                (Instant::now().duration_since(*latest_received_i_am_leader))
+                    > Duration::from_secs(i_am_leader_check_timeout)
             } else {
                 true
             }
-        }));
-    p_leader_expired
-        .clone()
-        .all_ticks()
-        .for_each(q!(|_| println!("Proposer leader expired")));
-
-    let p_to_acceptors_p1a = p_ballot_num
-        .continue_if(p_leader_expired)
-        .map(q!(move |ballot_num| P1a {
-            ballot: Ballot {
-                num: ballot_num,
-                id: p_id
-            }
         }))
-        .all_ticks()
-        .broadcast_bincode_interleaved(acceptors);
-    (p_to_proposers_i_am_leader_new, p_to_acceptors_p1a)
 }
diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
index f20193cd83b1..af57d72b873f 100644
--- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
+++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos_bench__tests__paxos_ir.snap
@@ -22,7 +22,7 @@ expression: built.ir()
             2,
         ),
         input: Map {
-            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }),
+            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , u32 > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | { if received_max_ballot > (Ballot { num : ballot_num , proposer_id : p_id , }) { received_max_ballot . num + 1 } else { ballot_num } } }),
             input: CrossSingleton(
                 Tee {
                     inner: <tee 0>: Union(
@@ -32,7 +32,7 @@ expression: built.ir()
                                 Union(
                                     Union(
                                         Map {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p1b) | p1b . max_ballot }),
+                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . max_ballot }),
                                             input: Tee {
                                                 inner: <tee 1>: CycleSource {
                                                     ident: Ident {
@@ -45,7 +45,7 @@ expression: built.ir()
                                             },
                                         },
                                         Map {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p2b) | p2b . max_ballot }),
+                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | p2b . ballot }),
                                             input: Tee {
                                                 inner: <tee 2>: CycleSource {
                                                     ident: Ident {
@@ -74,7 +74,7 @@ expression: built.ir()
                         Persist(
                             Source {
                                 source: Iter(
-                                    { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , id : ClusterId :: from_raw (0) } } ; [e] },
+                                    { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] },
                                 ),
                                 location_kind: Cluster(
                                     2,
@@ -108,105 +108,6 @@ expression: built.ir()
             ),
         },
     },
-    ForEach {
-        f: stageleft :: runtime_support :: fn1_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired") }),
-        input: Tee {
-            inner: <tee 5>: Filter {
-                f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | (_ , latest_received_i_am_leader) | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }),
-                input: Map {
-                    f: stageleft :: runtime_support :: fn1_type_hint :: < ((tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , ()) , (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
-                    input: CrossSingleton(
-                        CrossSingleton(
-                            Source {
-                                source: Stream(
-                                    { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) },
-                                ),
-                                location_kind: Cluster(
-                                    2,
-                                ),
-                            },
-                            Tee {
-                                inner: <tee 6>: Fold {
-                                    init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }),
-                                    acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }),
-                                    input: Persist(
-                                        Tee {
-                                            inner: <tee 3>,
-                                        },
-                                    ),
-                                },
-                            },
-                        ),
-                        Map {
-                            f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
-                            input: Filter {
-                                f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
-                                input: Fold {
-                                    init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
-                                    acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
-                                    input: Tee {
-                                        inner: <tee 7>: Map {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
-                                            input: CrossSingleton(
-                                                FilterMap {
-                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }),
-                                                    input: Fold {
-                                                        init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
-                                                        acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
-                                                        input: Unique(
-                                                            Map {
-                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((sender , _p1b) , _ballot_num) | { sender } }),
-                                                                input: Tee {
-                                                                    inner: <tee 8>: Filter {
-                                                                        f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((_sender , p1b) , ballot_num) | p1b . ballot == Ballot { num : * ballot_num , id : p_id } }),
-                                                                        input: CrossSingleton(
-                                                                            Persist(
-                                                                                Tee {
-                                                                                    inner: <tee 9>: Inspect {
-                                                                                        f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_ , p1b) | println ! ("Proposer received P1b: {:?}" , p1b) }),
-                                                                                        input: Tee {
-                                                                                            inner: <tee 1>,
-                                                                                        },
-                                                                                    },
-                                                                                },
-                                                                            ),
-                                                                            Tee {
-                                                                                inner: <tee 4>,
-                                                                            },
-                                                                        ),
-                                                                    },
-                                                                },
-                                                            },
-                                                        ),
-                                                    },
-                                                },
-                                                Map {
-                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
-                                                    input: Tee {
-                                                        inner: <tee 10>: Filter {
-                                                            f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , id : p_id } }),
-                                                            input: CrossSingleton(
-                                                                Tee {
-                                                                    inner: <tee 0>,
-                                                                },
-                                                                Tee {
-                                                                    inner: <tee 4>,
-                                                                },
-                                                            ),
-                                                        },
-                                                    },
-                                                },
-                                            ),
-                                        },
-                                    },
-                                },
-                            },
-                        },
-                    ),
-                },
-            },
-        },
-    },
     CycleSink {
         ident: Ident {
             sym: cycle_0,
@@ -249,7 +150,7 @@ expression: built.ir()
                 input: FlatMap {
                     f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > > > (__hydroflow_plus_cluster_ids_2) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
                     input: Map {
-                        f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , id : p_id } }),
+                        f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | Ballot { num : ballot_num , proposer_id : p_id } }),
                         input: Map {
                             f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                             input: CrossSingleton(
@@ -260,22 +161,70 @@ expression: built.ir()
                                             inner: <tee 4>,
                                         },
                                         Map {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
-                                            input: Source {
-                                                source: Interval(
-                                                    { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) },
-                                                ),
-                                                location_kind: Cluster(
-                                                    2,
-                                                ),
+                                            f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                            input: Tee {
+                                                inner: <tee 5>: Map {
+                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+                                                    input: CrossSingleton(
+                                                        FilterMap {
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received > f { Some (true) } else { None } }),
+                                                            input: Fold {
+                                                                init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
+                                                                acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+                                                                input: Tee {
+                                                                    inner: <tee 6>: Map {
+                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | t | t . 0 }),
+                                                                        input: Filter {
+                                                                            f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (p1b , ballot_num) | p1b . ballot . num == * ballot_num && p1b . ballot . proposer_id == p_id }),
+                                                                            input: CrossSingleton(
+                                                                                Persist(
+                                                                                    Inspect {
+                                                                                        f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | println ! ("Proposer received P1b: {:?}" , p1b) }),
+                                                                                        input: Tee {
+                                                                                            inner: <tee 1>,
+                                                                                        },
+                                                                                    },
+                                                                                ),
+                                                                                Tee {
+                                                                                    inner: <tee 4>,
+                                                                                },
+                                                                            ),
+                                                                        },
+                                                                    },
+                                                                },
+                                                            },
+                                                        },
+                                                        Map {
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                                            input: Tee {
+                                                                inner: <tee 7>: Filter {
+                                                                    f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: Ballot , u32) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (received_max_ballot , ballot_num) | * received_max_ballot <= Ballot { num : * ballot_num , proposer_id : p_id } }),
+                                                                    input: CrossSingleton(
+                                                                        Tee {
+                                                                            inner: <tee 0>,
+                                                                        },
+                                                                        Tee {
+                                                                            inner: <tee 4>,
+                                                                        },
+                                                                    ),
+                                                                },
+                                                            },
+                                                        },
+                                                    ),
+                                                },
                                             },
                                         },
                                     ),
                                 },
                                 Map {
-                                    f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
-                                    input: Tee {
-                                        inner: <tee 7>,
+                                    f: stageleft :: runtime_support :: fn1_type_hint :: < () , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                    input: Source {
+                                        source: Interval(
+                                            stageleft :: runtime_support :: type_hint :: < core :: time :: Duration > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }),
+                                        ),
+                                        location_kind: Cluster(
+                                            2,
+                                        ),
                                     },
                                 },
                             ),
@@ -295,7 +244,7 @@ expression: built.ir()
         input: DeferTick(
             Union(
                 Tee {
-                    inner: <tee 11>: Map {
+                    inner: <tee 8>: Map {
                         f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                         input: CrossSingleton(
                             Union(
@@ -307,18 +256,18 @@ expression: built.ir()
                                             input: CrossSingleton(
                                                 Union(
                                                     Tee {
-                                                        inner: <tee 12>: Reduce {
+                                                        inner: <tee 9>: Reduce {
                                                             f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
                                                             input: Map {
                                                                 f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
                                                                 input: Tee {
-                                                                    inner: <tee 13>: FoldKeyed {
-                                                                        init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , value : Default :: default () }) }),
+                                                                    inner: <tee 10>: FoldKeyed {
+                                                                        init: stageleft :: runtime_support :: fn0_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , LogValue { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , value : Default :: default () }) }),
                                                                         acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_entry , new_entry | { let same_values = new_entry . value == curr_entry . 1 . value ; let higher_ballot = new_entry . ballot > curr_entry . 1 . ballot ; if same_values { curr_entry . 0 += 1 ; } if higher_ballot { curr_entry . 1 . ballot = new_entry . ballot ; if ! same_values { curr_entry . 0 = 1 ; curr_entry . 1 . value = new_entry . value ; } } } }),
                                                                         input: FlatMap {
-                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , u32) , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((_ , p1b) , _) | p1b . accepted . into_iter () }),
+                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: collections :: hash_map :: IntoIter < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1b | p1b . accepted . into_iter () }),
                                                                             input: Tee {
-                                                                                inner: <tee 8>,
+                                                                                inner: <tee 6>,
                                                                             },
                                                                         },
                                                                     },
@@ -345,7 +294,7 @@ expression: built.ir()
                                                             init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                                             acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                                             input: Tee {
-                                                                inner: <tee 14>: CycleSource {
+                                                                inner: <tee 11>: CycleSource {
                                                                     ident: Ident {
                                                                         sym: cycle_4,
                                                                     },
@@ -364,15 +313,15 @@ expression: built.ir()
                                         f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (num_payloads , next_slot) | next_slot + num_payloads as i32 }),
                                         input: CrossSingleton(
                                             Tee {
-                                                inner: <tee 15>: Map {
+                                                inner: <tee 12>: Map {
                                                     f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , ()) , usize > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                                                     input: CrossSingleton(
                                                         Tee {
-                                                            inner: <tee 16>: Fold {
+                                                            inner: <tee 13>: Fold {
                                                                 init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                                                 acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                                                 input: Tee {
-                                                                    inner: <tee 17>: Map {
+                                                                    inner: <tee 14>: Map {
                                                                         f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
                                                                         input: Network {
                                                                             from_location: Cluster(
@@ -420,10 +369,10 @@ expression: built.ir()
                                                         Map {
                                                             f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
                                                             input: Tee {
-                                                                inner: <tee 18>: Filter {
+                                                                inner: <tee 15>: Filter {
                                                                     f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; | num_payloads | * num_payloads > 0 }),
                                                                     input: Tee {
-                                                                        inner: <tee 16>,
+                                                                        inner: <tee 13>,
                                                                     },
                                                                 },
                                                             },
@@ -432,7 +381,7 @@ expression: built.ir()
                                                 },
                                             },
                                             Tee {
-                                                inner: <tee 14>,
+                                                inner: <tee 11>,
                                             },
                                         ),
                                     },
@@ -441,7 +390,7 @@ expression: built.ir()
                                     f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , ()) , i32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                                     input: CrossSingleton(
                                         Tee {
-                                            inner: <tee 14>,
+                                            inner: <tee 11>,
                                         },
                                         Map {
                                             f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
@@ -451,7 +400,7 @@ expression: built.ir()
                                                     init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                                     acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                                     input: Tee {
-                                                        inner: <tee 18>,
+                                                        inner: <tee 15>,
                                                     },
                                                 },
                                             },
@@ -462,7 +411,7 @@ expression: built.ir()
                             Map {
                                 f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
                                 input: Tee {
-                                    inner: <tee 7>,
+                                    inner: <tee 5>,
                                 },
                             },
                         ),
@@ -474,7 +423,7 @@ expression: built.ir()
                         f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                         input: CrossSingleton(
                             Tee {
-                                inner: <tee 7>,
+                                inner: <tee 5>,
                             },
                             Map {
                                 f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
@@ -484,7 +433,7 @@ expression: built.ir()
                                         init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                         acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                         input: Tee {
-                                            inner: <tee 11>,
+                                            inner: <tee 8>,
                                         },
                                     },
                                 },
@@ -505,18 +454,18 @@ expression: built.ir()
         input: DeferTick(
             Difference(
                 Map {
-                    f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , (_count , _p2b)) | slot }),
+                    f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _value) | slot }),
                     input: Tee {
-                        inner: <tee 19>: Filter {
-                            f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | (_slot , (count , _p2b)) | * count > f }),
+                        inner: <tee 16>: FilterMap {
+                            f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , value)) | if count > f { Some ((slot , value)) } else { None } }),
                             input: Tee {
-                                inner: <tee 20>: FoldKeyed {
-                                    init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , P2b { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , max_ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , slot : 0 , value : Default :: default () }) }),
-                                    acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , (_sender , p2b) | { accum . 0 += 1 ; accum . 1 = p2b ; } }),
+                                inner: <tee 17>: FoldKeyed {
+                                    init: stageleft :: runtime_support :: fn0_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (0 , Default :: default ()) }),
+                                    acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | accum , value | { accum . 0 += 1 ; accum . 1 = value ; } }),
                                     input: FilterMap {
-                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , core :: option :: Option < (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (sender , p2b) | if p2b . ballot == p2b . max_ballot { Some ((p2b . slot , (sender , p2b))) } else { None } }),
+                                        f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , core :: option :: Option < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | if p2b . victory { Some (((p2b . slot , p2b . ballot) , p2b . value)) } else { None } }),
                                         input: Tee {
-                                            inner: <tee 21>: Union(
+                                            inner: <tee 18>: Union(
                                                 Tee {
                                                     inner: <tee 2>,
                                                 },
@@ -537,10 +486,10 @@ expression: built.ir()
                     },
                 },
                 Tee {
-                    inner: <tee 22>: FilterMap {
-                        f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | (slot , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }),
+                    inner: <tee 19>: FilterMap {
+                        f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload)) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | ((slot , _ballot) , (count , _p2b)) | if count == 2 * f + 1 { Some (slot) } else { None } }),
                         input: Tee {
-                            inner: <tee 20>,
+                            inner: <tee 17>,
                         },
                     },
                 },
@@ -556,16 +505,16 @@ expression: built.ir()
         ),
         input: DeferTick(
             Map {
-                f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , (sender , p2b)) | (sender , p2b) }),
+                f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , p2b) | p2b }),
                 input: AntiJoin(
                     Map {
-                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , (i32 , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (sender , p2b) | (p2b . slot , (sender , p2b)) }),
+                        f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | p2b | (p2b . slot , p2b) }),
                         input: Tee {
-                            inner: <tee 21>,
+                            inner: <tee 18>,
                         },
                     },
                     Tee {
-                        inner: <tee 22>,
+                        inner: <tee 19>,
                     },
                 ),
             },
@@ -589,355 +538,406 @@ expression: built.ir()
         location_kind: Cluster(
             2,
         ),
-        input: Network {
-            from_location: Cluster(
-                3,
-            ),
-            from_key: None,
-            to_location: Cluster(
-                2,
-            ),
-            to_key: None,
-            serialize_pipeline: Some(
-                Operator(
-                    Operator {
-                        path: "map",
-                        args: [
-                            "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
-                        ],
-                    },
+        input: Map {
+            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
+            input: Network {
+                from_location: Cluster(
+                    3,
                 ),
-            ),
-            instantiate_fn: <network instantiate>,
-            deserialize_pipeline: Some(
-                Operator(
-                    Operator {
-                        path: "map",
-                        args: [
-                            "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
-                        ],
-                    },
+                from_key: None,
+                to_location: Cluster(
+                    2,
                 ),
-            ),
-            input: Map {
-                f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }),
-                input: CrossSingleton(
-                    CrossSingleton(
-                        Tee {
-                            inner: <tee 23>: Inspect {
-                                f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }),
-                                input: Map {
-                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
-                                    input: Network {
-                                        from_location: Cluster(
-                                            2,
-                                        ),
-                                        from_key: None,
-                                        to_location: Cluster(
-                                            3,
-                                        ),
-                                        to_key: None,
-                                        serialize_pipeline: Some(
-                                            Operator(
-                                                Operator {
-                                                    path: "map",
-                                                    args: [
-                                                        "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }",
-                                                    ],
-                                                },
+                to_key: None,
+                serialize_pipeline: Some(
+                    Operator(
+                        Operator {
+                            path: "map",
+                            args: [
+                                "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
+                            ],
+                        },
+                    ),
+                ),
+                instantiate_fn: <network instantiate>,
+                deserialize_pipeline: Some(
+                    Operator(
+                        Operator {
+                            path: "map",
+                            args: [
+                                "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
+                            ],
+                        },
+                    ),
+                ),
+                input: Map {
+                    f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >)) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | ((p1a , max_ballot) , (_prev_checkpoint , log)) | (p1a . ballot . proposer_id , P1b { ballot : p1a . ballot , max_ballot , accepted : log }) }),
+                    input: CrossSingleton(
+                        CrossSingleton(
+                            Tee {
+                                inner: <tee 20>: Inspect {
+                                    f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | println ! ("Acceptor received P1a: {:?}" , p1a) }),
+                                    input: Map {
+                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P1a) , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
+                                        input: Network {
+                                            from_location: Cluster(
+                                                2,
                                             ),
-                                        ),
-                                        instantiate_fn: <network instantiate>,
-                                        deserialize_pipeline: Some(
-                                            Operator(
-                                                Operator {
-                                                    path: "map",
-                                                    args: [
-                                                        "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }",
-                                                    ],
-                                                },
+                                            from_key: None,
+                                            to_location: Cluster(
+                                                3,
                                             ),
-                                        ),
-                                        input: FlatMap {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
-                                            input: Map {
-                                                f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , id : p_id } } }),
-                                                input: Map {
-                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
-                                                    input: CrossSingleton(
-                                                        Tee {
-                                                            inner: <tee 4>,
-                                                        },
-                                                        Map {
-                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (tokio :: time :: Instant , core :: option :: Option < tokio :: time :: Instant >) , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
-                                                            input: Tee {
-                                                                inner: <tee 5>,
-                                                            },
+                                            to_key: None,
+                                            serialize_pipeline: Some(
+                                                Operator(
+                                                    Operator {
+                                                        path: "map",
+                                                        args: [
+                                                            "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P1a) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& data) . unwrap () . into ()) }",
+                                                        ],
+                                                    },
+                                                ),
+                                            ),
+                                            instantiate_fn: <network instantiate>,
+                                            deserialize_pipeline: Some(
+                                                Operator(
+                                                    Operator {
+                                                        path: "map",
+                                                        args: [
+                                                            "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P1a > (& b) . unwrap ()) }",
+                                                        ],
+                                                    },
+                                                ),
+                                            ),
+                                            input: FlatMap {
+                                                f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
+                                                input: Inspect {
+                                                    f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | _ | println ! ("Proposer leader expired, sending P1a") }),
+                                                    input: Map {
+                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , hydroflow_plus_test :: cluster :: paxos :: P1a > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ballot_num | P1a { ballot : Ballot { num : ballot_num , proposer_id : p_id } } }),
+                                                        input: Map {
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+                                                            input: CrossSingleton(
+                                                                Map {
+                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , ()) , u32 > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+                                                                    input: CrossSingleton(
+                                                                        Tee {
+                                                                            inner: <tee 4>,
+                                                                        },
+                                                                        Map {
+                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                                                            input: Filter {
+                                                                                f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , bool > ({ use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; move | latest_received_i_am_leader | { if let Some (latest_received_i_am_leader) = latest_received_i_am_leader { (Instant :: now () . duration_since (* latest_received_i_am_leader)) > Duration :: from_secs (i_am_leader_check_timeout) } else { true } } }),
+                                                                                input: Map {
+                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < tokio :: time :: Instant > , ()) , core :: option :: Option < tokio :: time :: Instant > > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
+                                                                                    input: CrossSingleton(
+                                                                                        Tee {
+                                                                                            inner: <tee 21>: Fold {
+                                                                                                init: stageleft :: runtime_support :: fn0_type_hint :: < core :: option :: Option < tokio :: time :: Instant > > ({ use crate :: __staged :: cluster :: paxos :: * ; | | None }),
+                                                                                                acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < tokio :: time :: Instant > , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | latest , _ | { * latest = Some (Instant :: now ()) ; } }),
+                                                                                                input: Persist(
+                                                                                                    Tee {
+                                                                                                        inner: <tee 3>,
+                                                                                                    },
+                                                                                                ),
+                                                                                            },
+                                                                                        },
+                                                                                        Map {
+                                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                                                                            input: Filter {
+                                                                                                f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | c | * c == 0 }),
+                                                                                                input: Fold {
+                                                                                                    init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
+                                                                                                    acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+                                                                                                    input: Tee {
+                                                                                                        inner: <tee 5>,
+                                                                                                    },
+                                                                                                },
+                                                                                            },
+                                                                                        },
+                                                                                    ),
+                                                                                },
+                                                                            },
+                                                                        },
+                                                                    ),
+                                                                },
+                                                                Map {
+                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < tokio :: time :: Instant , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
+                                                                    input: Source {
+                                                                        source: Stream(
+                                                                            { use hydroflow_plus :: __staged :: location :: * ; let delay = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout_delay_multiplier = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; Duration :: from_secs ((p_id . raw_id * i_am_leader_check_timeout_delay_multiplier as u32) . into ()) } ; let interval = { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_check_timeout = 1u64 ; Duration :: from_secs (i_am_leader_check_timeout) } ; tokio_stream :: wrappers :: IntervalStream :: new (tokio :: time :: interval_at (tokio :: time :: Instant :: now () + delay , interval)) },
+                                                                        ),
+                                                                        location_kind: Cluster(
+                                                                            2,
+                                                                        ),
+                                                                    },
+                                                                },
+                                                            ),
                                                         },
-                                                    ),
+                                                    },
                                                 },
                                             },
                                         },
                                     },
                                 },
                             },
-                        },
-                        Tee {
-                            inner: <tee 24>: Union(
-                                Reduce {
-                                    f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
-                                    input: Persist(
-                                        Map {
-                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }),
-                                            input: Tee {
-                                                inner: <tee 23>,
+                            Tee {
+                                inner: <tee 22>: Union(
+                                    Reduce {
+                                        f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
+                                        input: Persist(
+                                            Map {
+                                                f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P1a , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; | p1a | p1a . ballot }),
+                                                input: Tee {
+                                                    inner: <tee 20>,
+                                                },
                                             },
-                                        },
-                                    ),
-                                },
-                                Persist(
-                                    Source {
-                                        source: Iter(
-                                            { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , id : ClusterId :: from_raw (0) } } ; [e] },
-                                        ),
-                                        location_kind: Cluster(
-                                            3,
                                         ),
                                     },
+                                    Persist(
+                                        Source {
+                                            source: Iter(
+                                                { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } } ; [e] },
+                                            ),
+                                            location_kind: Cluster(
+                                                3,
+                                            ),
+                                        },
+                                    ),
                                 ),
-                            ),
-                        },
-                    ),
-                    Fold {
-                        init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }),
-                        acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , (new_checkpoint , p2a) | { if new_checkpoint != - 1 { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } else { if p2a . slot > * prev_checkpoint { match log . get (& p2a . slot) { None => { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } Some (prev_p2a) => { if p2a . ballot > prev_p2a . ballot { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } ; } } } }),
-                        input: Persist(
-                            Union(
-                                FilterMap {
-                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some ((- 1 , p2a)) } else { None } }),
-                                    input: CrossSingleton(
-                                        Tee {
-                                            inner: <tee 25>: Map {
-                                                f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
-                                                input: Network {
-                                                    from_location: Cluster(
-                                                        2,
-                                                    ),
-                                                    from_key: None,
-                                                    to_location: Cluster(
-                                                        3,
-                                                    ),
-                                                    to_key: None,
-                                                    serialize_pipeline: Some(
-                                                        Operator(
-                                                            Operator {
-                                                                path: "map",
-                                                                args: [
-                                                                    "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
-                                                                ],
-                                                            },
+                            },
+                        ),
+                        Fold {
+                            init: stageleft :: runtime_support :: fn0_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | | (- 1 , HashMap :: new ()) }),
+                            acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (i32 , std :: collections :: hash_map :: HashMap < i32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > >) , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | (prev_checkpoint , log) , (new_checkpoint , p2a) | { if new_checkpoint != - 1 { for slot in * prev_checkpoint .. new_checkpoint { log . remove (& slot) ; } * prev_checkpoint = new_checkpoint ; } else { if p2a . slot > * prev_checkpoint { match log . get (& p2a . slot) { None => { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } Some (prev_p2a) => { if p2a . ballot > prev_p2a . ballot { log . insert (p2a . slot , LogValue { ballot : p2a . ballot , value : p2a . value , } ,) ; } } } ; } } } }),
+                            input: Persist(
+                                Union(
+                                    FilterMap {
+                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , core :: option :: Option < (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | if p2a . ballot >= max_ballot { Some ((- 1 , p2a)) } else { None } }),
+                                        input: CrossSingleton(
+                                            Tee {
+                                                inner: <tee 23>: Map {
+                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
+                                                    input: Network {
+                                                        from_location: Cluster(
+                                                            2,
                                                         ),
-                                                    ),
-                                                    instantiate_fn: <network instantiate>,
-                                                    deserialize_pipeline: Some(
-                                                        Operator(
-                                                            Operator {
-                                                                path: "map",
-                                                                args: [
-                                                                    "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
-                                                                ],
-                                                            },
+                                                        from_key: None,
+                                                        to_location: Cluster(
+                                                            3,
                                                         ),
-                                                    ),
-                                                    input: FlatMap {
-                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
-                                                        input: Map {
-                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
-                                                            input: CrossSingleton(
-                                                                Union(
-                                                                    Map {
-                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
-                                                                        input: CrossSingleton(
-                                                                            Union(
-                                                                                FilterMap {
-                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , id : p_id , } , slot , value : entry . value , }) } else { None } }),
-                                                                                    input: CrossSingleton(
-                                                                                        Tee {
-                                                                                            inner: <tee 13>,
-                                                                                        },
-                                                                                        Tee {
-                                                                                            inner: <tee 4>,
-                                                                                        },
-                                                                                    ),
-                                                                                },
-                                                                                Map {
-                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , id : p_id } , slot , value : Default :: default () } }),
-                                                                                    input: CrossSingleton(
-                                                                                        Difference(
-                                                                                            FlatMap {
-                                                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }),
-                                                                                                input: Tee {
-                                                                                                    inner: <tee 12>,
-                                                                                                },
+                                                        to_key: None,
+                                                        serialize_pipeline: Some(
+                                                            Operator(
+                                                                Operator {
+                                                                    path: "map",
+                                                                    args: [
+                                                                        "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
+                                                                    ],
+                                                                },
+                                                            ),
+                                                        ),
+                                                        instantiate_fn: <network instantiate>,
+                                                        deserialize_pipeline: Some(
+                                                            Operator(
+                                                                Operator {
+                                                                    path: "map",
+                                                                    args: [
+                                                                        "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
+                                                                    ],
+                                                                },
+                                                            ),
+                                                        ),
+                                                        input: FlatMap {
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
+                                                            input: Map {
+                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
+                                                                input: CrossSingleton(
+                                                                    Union(
+                                                                        Map {
+                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , ()) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
+                                                                            input: CrossSingleton(
+                                                                                Union(
+                                                                                    FilterMap {
+                                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < ((i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , u32) , core :: option :: Option < hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | ((slot , (count , entry)) , ballot_num) | if count <= f as u32 { Some (P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id , } , slot , value : entry . value , }) } else { None } }),
+                                                                                        input: CrossSingleton(
+                                                                                            Tee {
+                                                                                                inner: <tee 10>,
                                                                                             },
-                                                                                            Map {
-                                                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
-                                                                                                input: Tee {
-                                                                                                    inner: <tee 13>,
+                                                                                            Tee {
+                                                                                                inner: <tee 4>,
+                                                                                            },
+                                                                                        ),
+                                                                                    },
+                                                                                    Map {
+                                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (slot , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot , value : Default :: default () } }),
+                                                                                        input: CrossSingleton(
+                                                                                            Difference(
+                                                                                                FlatMap {
+                                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: ops :: Range < i32 > > ({ use crate :: __staged :: cluster :: paxos :: * ; | max_slot | 0 .. max_slot }),
+                                                                                                    input: Tee {
+                                                                                                        inner: <tee 9>,
+                                                                                                    },
+                                                                                                },
+                                                                                                Map {
+                                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (u32 , hydroflow_plus_test :: cluster :: paxos :: LogValue < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , _) | slot }),
+                                                                                                    input: Tee {
+                                                                                                        inner: <tee 10>,
+                                                                                                    },
                                                                                                 },
+                                                                                            ),
+                                                                                            Tee {
+                                                                                                inner: <tee 4>,
                                                                                             },
                                                                                         ),
-                                                                                        Tee {
-                                                                                            inner: <tee 4>,
+                                                                                    },
+                                                                                ),
+                                                                                Map {
+                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
+                                                                                    input: Filter {
+                                                                                        f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }),
+                                                                                        input: Fold {
+                                                                                            init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
+                                                                                            acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+                                                                                            input: Tee {
+                                                                                                inner: <tee 11>,
+                                                                                            },
                                                                                         },
-                                                                                    ),
+                                                                                    },
                                                                                 },
                                                                             ),
-                                                                            Map {
-                                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
-                                                                                input: Filter {
-                                                                                    f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < usize , bool > ({ use hydroflow_plus :: __staged :: stream :: * ; | c | * c == 0 }),
-                                                                                    input: Fold {
-                                                                                        init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
-                                                                                        acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
-                                                                                        input: Tee {
+                                                                        },
+                                                                        Map {
+                                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , proposer_id : p_id } , slot : next_slot + index as i32 , value : payload } }),
+                                                                            input: CrossSingleton(
+                                                                                CrossSingleton(
+                                                                                    Enumerate(
+                                                                                        Tee {
                                                                                             inner: <tee 14>,
                                                                                         },
-                                                                                    },
-                                                                                },
-                                                                            },
-                                                                        ),
-                                                                    },
-                                                                    Map {
-                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (((usize , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , i32) , u32) , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (((index , payload) , next_slot) , ballot_num) | P2a { ballot : Ballot { num : ballot_num , id : p_id } , slot : next_slot + index as i32 , value : payload } }),
-                                                                        input: CrossSingleton(
-                                                                            CrossSingleton(
-                                                                                Enumerate(
+                                                                                    ),
                                                                                     Tee {
-                                                                                        inner: <tee 17>,
+                                                                                        inner: <tee 11>,
                                                                                     },
                                                                                 ),
                                                                                 Tee {
-                                                                                    inner: <tee 14>,
+                                                                                    inner: <tee 4>,
                                                                                 },
                                                                             ),
-                                                                            Tee {
-                                                                                inner: <tee 4>,
-                                                                            },
-                                                                        ),
+                                                                        },
+                                                                    ),
+                                                                    Map {
+                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
+                                                                        input: Tee {
+                                                                            inner: <tee 5>,
+                                                                        },
                                                                     },
                                                                 ),
-                                                                Map {
-                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
-                                                                    input: Tee {
-                                                                        inner: <tee 7>,
-                                                                    },
-                                                                },
-                                                            ),
+                                                            },
                                                         },
                                                     },
                                                 },
                                             },
-                                        },
-                                        Tee {
-                                            inner: <tee 24>,
-                                        },
-                                    ),
-                                },
-                                Map {
-                                    f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | (min_seq , P2a { ballot : Ballot { num : 0 , id : ClusterId :: from_raw (0) } , slot : - 1 , value : Default :: default () }) }),
-                                    input: Delta(
-                                        Union(
-                                            Reduce {
-                                                f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }),
-                                                input: Map {
-                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }),
+                                            Tee {
+                                                inner: <tee 22>,
+                                            },
+                                        ),
+                                    },
+                                    Map {
+                                        f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , (i32 , hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | min_seq | (min_seq , P2a { ballot : Ballot { num : 0 , proposer_id : ClusterId :: from_raw (0) } , slot : - 1 , value : Default :: default () }) }),
+                                        input: Delta(
+                                            Union(
+                                                Reduce {
+                                                    f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new < * curr { * curr = new ; } } }),
                                                     input: Map {
-                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
-                                                        input: CrossSingleton(
-                                                            Tee {
-                                                                inner: <tee 26>: ReduceKeyed {
-                                                                    f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }),
-                                                                    input: Persist(
-                                                                        Network {
-                                                                            from_location: Cluster(
-                                                                                1,
-                                                                            ),
-                                                                            from_key: None,
-                                                                            to_location: Cluster(
-                                                                                3,
-                                                                            ),
-                                                                            to_key: None,
-                                                                            serialize_pipeline: Some(
-                                                                                Operator(
-                                                                                    Operator {
-                                                                                        path: "map",
-                                                                                        args: [
-                                                                                            "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }",
-                                                                                        ],
-                                                                                    },
+                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_sender , seq) | seq }),
+                                                        input: Map {
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < ((hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , ()) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) > ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }),
+                                                            input: CrossSingleton(
+                                                                Tee {
+                                                                    inner: <tee 24>: ReduceKeyed {
+                                                                        f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | curr_seq , seq | { if seq > * curr_seq { * curr_seq = seq ; } } }),
+                                                                        input: Persist(
+                                                                            Network {
+                                                                                from_location: Cluster(
+                                                                                    1,
                                                                                 ),
-                                                                            ),
-                                                                            instantiate_fn: <network instantiate>,
-                                                                            deserialize_pipeline: Some(
-                                                                                Operator(
-                                                                                    Operator {
-                                                                                        path: "map",
-                                                                                        args: [
-                                                                                            "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }",
-                                                                                        ],
-                                                                                    },
+                                                                                from_key: None,
+                                                                                to_location: Cluster(
+                                                                                    3,
                                                                                 ),
-                                                                            ),
-                                                                            input: FlatMap {
-                                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
-                                                                                input: CycleSource {
-                                                                                    ident: Ident {
-                                                                                        sym: cycle_0,
-                                                                                    },
-                                                                                    location_kind: Cluster(
-                                                                                        1,
+                                                                                to_key: None,
+                                                                                serialize_pipeline: Some(
+                                                                                    Operator(
+                                                                                        Operator {
+                                                                                            path: "map",
+                                                                                            args: [
+                                                                                                "| (id , data) : (hydroflow_plus :: ClusterId < _ > , i32) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < i32 > (& data) . unwrap () . into ()) }",
+                                                                                            ],
+                                                                                        },
                                                                                     ),
+                                                                                ),
+                                                                                instantiate_fn: <network instantiate>,
+                                                                                deserialize_pipeline: Some(
+                                                                                    Operator(
+                                                                                        Operator {
+                                                                                            path: "map",
+                                                                                            args: [
+                                                                                                "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < i32 > (& b) . unwrap ()) }",
+                                                                                            ],
+                                                                                        },
+                                                                                    ),
+                                                                                ),
+                                                                                input: FlatMap {
+                                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > > > (__hydroflow_plus_cluster_ids_3) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
+                                                                                    input: CycleSource {
+                                                                                        ident: Ident {
+                                                                                            sym: cycle_0,
+                                                                                        },
+                                                                                        location_kind: Cluster(
+                                                                                            1,
+                                                                                        ),
+                                                                                    },
                                                                                 },
                                                                             },
-                                                                        },
-                                                                    ),
+                                                                        ),
+                                                                    },
                                                                 },
-                                                            },
-                                                            Map {
-                                                                f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
-                                                                input: FilterMap {
-                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }),
-                                                                    input: Fold {
-                                                                        init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
-                                                                        acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
-                                                                        input: Tee {
-                                                                            inner: <tee 26>,
+                                                                Map {
+                                                                    f: stageleft :: runtime_support :: fn1_type_hint :: < bool , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }),
+                                                                    input: FilterMap {
+                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < bool > > ({ use crate :: __staged :: cluster :: paxos :: * ; let f = 1usize ; move | num_received | if num_received == f + 1 { Some (true) } else { None } }),
+                                                                        input: Fold {
+                                                                            init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
+                                                                            acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , i32) , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
+                                                                            input: Tee {
+                                                                                inner: <tee 24>,
+                                                                            },
                                                                         },
                                                                     },
                                                                 },
-                                                            },
-                                                        ),
+                                                            ),
+                                                        },
                                                     },
                                                 },
-                                            },
-                                            Persist(
-                                                Source {
-                                                    source: Iter(
-                                                        { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] },
-                                                    ),
-                                                    location_kind: Cluster(
-                                                        3,
-                                                    ),
-                                                },
+                                                Persist(
+                                                    Source {
+                                                        source: Iter(
+                                                            { use hydroflow_plus :: __staged :: location :: * ; let e = { use crate :: __staged :: cluster :: paxos :: * ; - 1 } ; [e] },
+                                                        ),
+                                                        location_kind: Cluster(
+                                                            3,
+                                                        ),
+                                                    },
+                                                ),
                                             ),
                                         ),
-                                    ),
-                                },
+                                    },
+                                ),
                             ),
-                        ),
-                    },
-                ),
+                        },
+                    ),
+                },
             },
         },
     },
@@ -948,46 +948,49 @@ expression: built.ir()
         location_kind: Cluster(
             2,
         ),
-        input: Network {
-            from_location: Cluster(
-                3,
-            ),
-            from_key: None,
-            to_location: Cluster(
-                2,
-            ),
-            to_key: None,
-            serialize_pipeline: Some(
-                Operator(
-                    Operator {
-                        path: "map",
-                        args: [
-                            "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
-                        ],
-                    },
+        input: Map {
+            f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Acceptor > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
+            input: Network {
+                from_location: Cluster(
+                    3,
                 ),
-            ),
-            instantiate_fn: <network instantiate>,
-            deserialize_pipeline: Some(
-                Operator(
-                    Operator {
-                        path: "map",
-                        args: [
-                            "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
-                        ],
-                    },
+                from_key: None,
+                to_location: Cluster(
+                    2,
                 ),
-            ),
-            input: Map {
-                f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . id , P2b { ballot : p2a . ballot , max_ballot , slot : p2a . slot , value : p2a . value }) }),
-                input: CrossSingleton(
-                    Tee {
-                        inner: <tee 25>,
-                    },
-                    Tee {
-                        inner: <tee 24>,
-                    },
+                to_key: None,
+                serialize_pipeline: Some(
+                    Operator(
+                        Operator {
+                            path: "map",
+                            args: [
+                                "| (id , data) : (hydroflow_plus :: ClusterId < _ > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) | { (id . raw_id , hydroflow_plus :: runtime_support :: bincode :: serialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& data) . unwrap () . into ()) }",
+                            ],
+                        },
+                    ),
                 ),
+                instantiate_fn: <network instantiate>,
+                deserialize_pipeline: Some(
+                    Operator(
+                        Operator {
+                            path: "map",
+                            args: [
+                                "| res | { let (id , b) = res . unwrap () ; (hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Acceptor > :: from_raw (id) , hydroflow_plus :: runtime_support :: bincode :: deserialize :: < hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > > (& b) . unwrap ()) }",
+                            ],
+                        },
+                    ),
+                ),
+                input: Map {
+                    f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos :: P2a < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload > , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (p2a , max_ballot) | (p2a . ballot . proposer_id , P2b { victory : p2a . ballot == max_ballot , ballot : p2a . ballot , slot : p2a . slot , value : p2a . value }) }),
+                    input: CrossSingleton(
+                        Tee {
+                            inner: <tee 23>,
+                        },
+                        Tee {
+                            inner: <tee 22>,
+                        },
+                    ),
+                },
             },
         },
     },
@@ -1005,10 +1008,10 @@ expression: built.ir()
                     f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq > * highest_seq }),
                     input: CrossSingleton(
                         Tee {
-                            inner: <tee 27>: Sort(
+                            inner: <tee 25>: Sort(
                                 Union(
                                     Tee {
-                                        inner: <tee 28>: Map {
+                                        inner: <tee 26>: Map {
                                             f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }),
                                             input: Network {
                                                 from_location: Cluster(
@@ -1045,10 +1048,10 @@ expression: built.ir()
                                                     input: Map {
                                                         f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (slot , data) | ReplicaPayload { seq : slot , key : data . key , value : data . value } }),
                                                         input: Map {
-                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , (usize , hydroflow_plus_test :: cluster :: paxos :: P2b < hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload >)) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (_slot , (_count , p2b)) | (p2b . slot , p2b . value) }),
+                                                            f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) , (i32 , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos :: * ; | (slot , value) | (slot , value) }),
                                                             input: AntiJoin(
                                                                 Tee {
-                                                                    inner: <tee 19>,
+                                                                    inner: <tee 16>,
                                                                 },
                                                                 CycleSource {
                                                                     ident: Ident {
@@ -1077,12 +1080,12 @@ expression: built.ir()
                             ),
                         },
                         Tee {
-                            inner: <tee 29>: Fold {
+                            inner: <tee 27>: Fold {
                                 init: stageleft :: runtime_support :: fn0_type_hint :: < i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | - 1 }),
                                 acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | filled_slot , (sorted_payload , highest_seq) | { let next_slot = std :: cmp :: max (* filled_slot , highest_seq) ; * filled_slot = if sorted_payload . seq == next_slot + 1 { sorted_payload . seq } else { * filled_slot } ; } }),
                                 input: CrossSingleton(
                                     Tee {
-                                        inner: <tee 27>,
+                                        inner: <tee 25>,
                                     },
                                     Union(
                                         CycleSource {
@@ -1119,23 +1122,23 @@ expression: built.ir()
         ),
         input: DeferTick(
             Tee {
-                inner: <tee 30>: Map {
+                inner: <tee 28>: Map {
                     f: stageleft :: runtime_support :: fn1_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , i32 > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_kv_store , highest_seq) | highest_seq }),
                     input: Fold {
                         init: stageleft :: runtime_support :: fn0_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | (HashMap :: < u32 , String > :: new () , - 1) }),
                         acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < (std :: collections :: hash_map :: HashMap < u32 , std :: string :: String > , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | state , payload | { let kv_store = & mut state . 0 ; let last_seq = & mut state . 1 ; kv_store . insert (payload . key , payload . value) ; debug_assert ! (payload . seq == * last_seq + 1 , "Hole in log between seq {} and {}" , * last_seq , payload . seq) ; * last_seq = payload . seq ; } }),
                         input: Persist(
                             Tee {
-                                inner: <tee 31>: Map {
+                                inner: <tee 29>: Map {
                                     f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , _) | { sorted_payload } }),
                                     input: Filter {
                                         f: stageleft :: runtime_support :: fn1_borrow_type_hint :: < (hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , i32) , bool > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sorted_payload , highest_seq) | sorted_payload . seq <= * highest_seq }),
                                         input: CrossSingleton(
                                             Tee {
-                                                inner: <tee 27>,
+                                                inner: <tee 25>,
                                             },
                                             Tee {
-                                                inner: <tee 29>,
+                                                inner: <tee 27>,
                                             },
                                         ),
                                     },
@@ -1156,7 +1159,7 @@ expression: built.ir()
         ),
         input: DeferTick(
             Tee {
-                inner: <tee 32>: FilterMap {
+                inner: <tee 30>: FilterMap {
                     f: stageleft :: runtime_support :: fn1_type_hint :: < (i32 , i32) , core :: option :: Option < i32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let checkpoint_frequency = 1usize ; move | (max_checkpointed_seq , new_highest_seq) | if new_highest_seq - max_checkpointed_seq >= checkpoint_frequency as i32 { Some (new_highest_seq) } else { None } }),
                     input: CrossSingleton(
                         Union(
@@ -1185,7 +1188,7 @@ expression: built.ir()
                             ),
                         ),
                         Tee {
-                            inner: <tee 30>,
+                            inner: <tee 28>,
                         },
                     ),
                 },
@@ -1200,7 +1203,7 @@ expression: built.ir()
             1,
         ),
         input: Tee {
-            inner: <tee 32>,
+            inner: <tee 30>,
         },
     },
     CycleSink {
@@ -1213,7 +1216,7 @@ expression: built.ir()
         input: DeferTick(
             AntiJoin(
                 Tee {
-                    inner: <tee 33>: Union(
+                    inner: <tee 31>: Union(
                         Map {
                             f: stageleft :: runtime_support :: fn1_type_hint :: < (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) , (u32 , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica >) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (sender , replica_payload) | (replica_payload . key , sender) }),
                             input: Network {
@@ -1249,7 +1252,7 @@ expression: built.ir()
                                 input: Map {
                                     f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > , hydroflow_plus_test :: cluster :: paxos_bench :: ReplicaPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | payload | (ClusterId :: from_raw (payload . value . parse :: < u32 > () . unwrap ()) , payload) }),
                                     input: Tee {
-                                        inner: <tee 28>,
+                                        inner: <tee 26>,
                                     },
                                 },
                             },
@@ -1265,13 +1268,13 @@ expression: built.ir()
                     ),
                 },
                 Tee {
-                    inner: <tee 34>: FilterMap {
+                    inner: <tee 32>: FilterMap {
                         f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , usize) , core :: option :: Option < u32 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let f = 1usize ; move | (key , count) | { if count == f + 1 { Some (key) } else { None } } }),
                         input: FoldKeyed {
                             init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | | 0 }),
                             acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Replica > , () > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | curr_count , _sender | { * curr_count += 1 ; } }),
                             input: Tee {
-                                inner: <tee 33>,
+                                inner: <tee 31>,
                             },
                         },
                     },
@@ -1292,7 +1295,7 @@ expression: built.ir()
                 input: Union(
                     Union(
                         Tee {
-                            inner: <tee 35>: CycleSource {
+                            inner: <tee 33>: CycleSource {
                                 ident: Ident {
                                     sym: cycle_2,
                                 },
@@ -1306,9 +1309,9 @@ expression: built.ir()
                             input: Map {
                                 f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: time :: SystemTime > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | SystemTime :: now () }),
                                 input: Tee {
-                                    inner: <tee 36>: Delta(
+                                    inner: <tee 34>: Delta(
                                         Tee {
-                                            inner: <tee 37>: Reduce {
+                                            inner: <tee 35>: Reduce {
                                                 f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , hydroflow_plus_test :: cluster :: paxos :: Ballot , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
                                                 input: Persist(
                                                     Inspect {
@@ -1348,13 +1351,13 @@ expression: built.ir()
                                                                 input: FlatMap {
                                                                     f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: slice :: Iter < hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > , _ > > ({ use hydroflow_plus :: __staged :: stream :: * ; let ids = unsafe { :: std :: mem :: transmute :: < _ , & :: std :: vec :: Vec < hydroflow_plus :: ClusterId < hydroflow_plus_test :: cluster :: paxos_bench :: Client > > > (__hydroflow_plus_cluster_ids_0) } ; | b | ids . iter () . map (move | id | (:: std :: clone :: Clone :: clone (id) , :: std :: clone :: Clone :: clone (& b))) }),
                                                                     input: Map {
-                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , u32) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (_is_leader , ballot_num) | Ballot { num : ballot_num , id : p_id } }),
+                                                                        f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , u32) , hydroflow_plus_test :: cluster :: paxos :: Ballot > ({ use crate :: __staged :: cluster :: paxos :: * ; let p_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos :: Proposer > :: from_raw (__hydroflow_plus_cluster_self_id_2) ; move | (_is_leader , ballot_num) | Ballot { num : ballot_num , proposer_id : p_id } }),
                                                                         input: CrossSingleton(
                                                                             Map {
                                                                                 f: stageleft :: runtime_support :: fn1_type_hint :: < (bool , ()) , bool > ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }),
                                                                                 input: CrossSingleton(
                                                                                     Tee {
-                                                                                        inner: <tee 7>,
+                                                                                        inner: <tee 5>,
                                                                                     },
                                                                                     Map {
                                                                                         f: stageleft :: runtime_support :: fn1_type_hint :: < usize , () > ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }),
@@ -1364,7 +1367,7 @@ expression: built.ir()
                                                                                                 init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                                                                                 acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , i32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                                                                                 input: Tee {
-                                                                                                    inner: <tee 14>,
+                                                                                                    inner: <tee 11>,
                                                                                                 },
                                                                                             },
                                                                                         },
@@ -1389,10 +1392,10 @@ expression: built.ir()
                         },
                     ),
                     Tee {
-                        inner: <tee 38>: Map {
+                        inner: <tee 36>: Map {
                             f: stageleft :: runtime_support :: fn1_type_hint :: < u32 , (usize , std :: time :: SystemTime) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | key | (key as usize , SystemTime :: now ()) }),
                             input: Tee {
-                                inner: <tee 34>,
+                                inner: <tee 32>,
                             },
                         },
                     },
@@ -1405,7 +1408,7 @@ expression: built.ir()
         input: CrossSingleton(
             CrossSingleton(
                 Tee {
-                    inner: <tee 39>: Source {
+                    inner: <tee 37>: Source {
                         source: Interval(
                             { use crate :: __staged :: cluster :: paxos_bench :: * ; Duration :: from_secs (1) },
                         ),
@@ -1423,10 +1426,10 @@ expression: built.ir()
                                 f: stageleft :: runtime_support :: fn1_type_hint :: < (usize , (std :: time :: SystemTime , std :: time :: SystemTime)) , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | (_virtual_id , (prev_time , curr_time)) | Some (curr_time . duration_since (prev_time) . unwrap () . as_micros ()) }),
                                 input: Join(
                                     Tee {
-                                        inner: <tee 35>,
+                                        inner: <tee 33>,
                                     },
                                     Tee {
-                                        inner: <tee 38>,
+                                        inner: <tee 36>,
                                     },
                                 ),
                             },
@@ -1434,7 +1437,7 @@ expression: built.ir()
                                 Map {
                                     f: stageleft :: runtime_support :: fn1_type_hint :: < () , core :: option :: Option < u128 > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | None }),
                                     input: Tee {
-                                        inner: <tee 39>,
+                                        inner: <tee 37>,
                                     },
                                 },
                             ),
@@ -1456,7 +1459,7 @@ expression: built.ir()
                                         init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                         acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , u32 , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                         input: Tee {
-                                            inner: <tee 34>,
+                                            inner: <tee 32>,
                                         },
                                     },
                                     Map {
@@ -1467,7 +1470,7 @@ expression: built.ir()
                                                 init: stageleft :: runtime_support :: fn0_type_hint :: < usize > ({ use hydroflow_plus :: __staged :: stream :: * ; | | 0usize }),
                                                 acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , () , () > ({ use hydroflow_plus :: __staged :: stream :: * ; | count , _ | * count += 1 }),
                                                 input: Tee {
-                                                    inner: <tee 39>,
+                                                    inner: <tee 37>,
                                                 },
                                             },
                                         },
@@ -1479,7 +1482,7 @@ expression: built.ir()
                             Map {
                                 f: stageleft :: runtime_support :: fn1_type_hint :: < () , (usize , bool) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; | _ | (0 , true) }),
                                 input: Tee {
-                                    inner: <tee 39>,
+                                    inner: <tee 37>,
                                 },
                             },
                         ),
@@ -1499,17 +1502,17 @@ expression: built.ir()
             FlatMap {
                 f: stageleft :: runtime_support :: fn1_type_hint :: < hydroflow_plus_test :: cluster :: paxos :: Ballot , std :: iter :: Map < std :: ops :: Range < usize > , _ > > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; let num_clients_per_node = 1usize ; move | leader_ballot | (0 .. num_clients_per_node) . map (move | i | (leader_ballot . leader_id () , ClientPayload { key : i as u32 , value : c_id . raw_id . to_string () })) }),
                 input: Tee {
-                    inner: <tee 36>,
+                    inner: <tee 34>,
                 },
             },
             Map {
                 f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , hydroflow_plus_test :: cluster :: paxos :: Ballot) , (hydroflow_plus :: location :: ClusterId < hydroflow_plus_test :: cluster :: paxos :: Proposer > , hydroflow_plus_test :: cluster :: paxos_bench :: ClientPayload) > ({ use crate :: __staged :: cluster :: paxos_bench :: * ; let c_id = hydroflow_plus :: ClusterId :: < hydroflow_plus_test :: cluster :: paxos_bench :: Client > :: from_raw (__hydroflow_plus_cluster_self_id_0) ; move | (key , leader_ballot) | (leader_ballot . leader_id () , ClientPayload { key , value : c_id . raw_id . to_string () }) }),
                 input: CrossSingleton(
                     Tee {
-                        inner: <tee 34>,
+                        inner: <tee 32>,
                     },
                     Tee {
-                        inner: <tee 37>,
+                        inner: <tee 35>,
                     },
                 ),
             },