diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs
index 83f502b17e7..8c3c89a2a1b 100644
--- a/hydroflow_plus_test/src/cluster/paxos.rs
+++ b/hydroflow_plus_test/src/cluster/paxos.rs
@@ -1,4 +1,5 @@
use std::collections::HashMap;
+use std::fmt::Debug;
use std::time::Duration;
use hydroflow_plus::*;
@@ -13,7 +14,7 @@ pub struct Proposer {}
pub struct Acceptor {}
pub trait PaxosPayload:
- Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + std::fmt::Debug
+ Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + Debug
{
}
@@ -42,10 +43,10 @@ struct LogValue
{
}
#[derive(Serialize, Deserialize, Clone, Debug)]
-struct P1b
{
+struct P1b {
ballot: Ballot,
max_ballot: Ballot,
- accepted: HashMap>,
+ accepted: L,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -99,21 +100,28 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
.for_each(q!(|s| println!("{}", s)));
let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) =
- proposers.forward_ref::>();
+ proposers.forward_ref::, _, _, _>>();
let (a_log_complete_cycle, a_log_forward_reference) =
- acceptors.tick_forward_ref::>();
+ acceptors.tick_forward_ref::>), _, _, _>>();
- let (p_ballot_num, p_is_leader, p_max_slot, p_log_to_try_commit, p_log_holes, a_max_ballot) =
- leader_election(
- &proposers,
- &acceptors,
- f,
- i_am_leader_send_timeout,
- i_am_leader_check_timeout,
- i_am_leader_check_timeout_delay_multiplier,
- a_to_proposers_p2b_forward_reference,
- a_log_forward_reference,
- );
+ let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
+ &proposers,
+ &acceptors,
+ f,
+ i_am_leader_send_timeout,
+ i_am_leader_check_timeout,
+ i_am_leader_check_timeout_delay_multiplier,
+ a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.ballot)),
+ a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
+ );
+
+ let (p_log_to_try_commit, p_max_slot, p_log_holes) =
+ recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f);
+
+ let p_log_to_recommit = p_log_to_try_commit
+ .union(p_log_holes)
+ .continue_unless(p_is_leader.clone().defer_tick())
+ .continue_if(p_is_leader.clone()); // Only resend p1b stuff once the moment we become leader.
let c_to_proposers = c_to_proposers(&proposers);
@@ -126,13 +134,12 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
p_ballot_num,
p_is_leader,
p_max_slot,
- p_log_to_try_commit,
- p_log_holes,
+ p_log_to_recommit,
f,
a_max_ballot,
);
- a_log_complete_cycle.complete(a_log.clone());
+ a_log_complete_cycle.complete(a_log);
a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b);
(
@@ -148,28 +155,26 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
clippy::too_many_arguments,
reason = "internal paxos code // TODO"
)]
-fn leader_election<'a, P: PaxosPayload>(
+fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
- a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>,
- a_log: Singleton<(i32, HashMap>), Bounded, Tick, Cluster<'a, Acceptor>>,
+ p_received_p2b_ballots: Stream>,
+ a_log: Singleton>,
) -> (
Singleton>,
Optional>,
- Optional>,
- Stream, Bounded, Tick, Cluster<'a, Proposer>>,
- Stream, Bounded, Tick, Cluster<'a, Proposer>>,
+ Stream, Bounded, Tick, Cluster<'a, Proposer>>,
Singleton>,
) {
- let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b) =
- proposers.forward_ref::>();
- let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) =
+ let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b_forward_ref) =
+ proposers.forward_ref::, _, _, _>>();
+ let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
proposers.forward_ref::>();
- let (p_is_leader_complete_cycle, p_is_leader) =
+ let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
proposers.tick_forward_ref::>();
// a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b)));
// p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot)));
@@ -177,29 +182,36 @@ fn leader_election<'a, P: PaxosPayload>(
let p_received_max_ballot = p_max_ballot(
proposers,
- a_to_proposers_p1b,
- a_to_proposers_p2b,
- p_to_proposers_i_am_leader.clone(),
+ a_to_proposers_p1b_forward_ref.map(q!(|p1a| p1a.max_ballot)),
+ p_received_p2b_ballots,
+ p_to_proposers_i_am_leader_forward_ref,
);
let (p_ballot_num, p_has_largest_ballot) =
p_ballot_calc(proposers, p_received_max_ballot.latest_tick());
- let (p_to_proposers_i_am_leader_from_others, p_to_acceptors_p1a) = p_p1a(
- p_ballot_num.clone(),
- p_is_leader.clone(),
+ let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
proposers,
- acceptors,
+ p_is_leader_forward_ref,
+ p_ballot_num.clone(),
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
);
- p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader_from_others);
+
+ p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);
+
+ let p_to_acceptors_p1a = p_p1a(
+ p_ballot_num.clone(),
+ p_trigger_election,
+ proposers,
+ acceptors,
+ );
let (a_max_ballot, a_to_proposers_p1b) =
acceptor_p1(acceptors, p_to_acceptors_p1a, a_log, proposers);
a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone());
- let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b(
+ let (p_is_leader, p_relevant_p1bs) = p_p1b(
proposers,
a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
p_ballot_num.clone(),
@@ -208,25 +220,16 @@ fn leader_election<'a, P: PaxosPayload>(
);
p_is_leader_complete_cycle.complete(p_is_leader.clone());
- (
- p_ballot_num,
- p_is_leader,
- p_max_slot,
- p_log_to_try_commit,
- p_log_holes,
- a_max_ballot,
- )
+ (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot)
}
// Proposer logic to calculate the largest ballot received so far.
-fn p_max_ballot<'a, P: PaxosPayload>(
+fn p_max_ballot<'a>(
proposers: &Cluster<'a, Proposer>,
- a_to_proposers_p1b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>,
- a_to_proposers_p2b: Stream, Unbounded, NoTick, Cluster<'a, Proposer>>,
+ p_received_p1b_ballots: Stream>,
+ p_received_p2b_ballots: Stream>,
p_to_proposers_i_am_leader: Stream>,
) -> Singleton> {
- let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot));
- let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot));
p_received_p1b_ballots
.union(p_received_p2b_ballots)
.union(p_to_proposers_i_am_leader)
@@ -308,24 +311,22 @@ fn p_leader_expired<'a>(
}))
}
-// Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
-fn p_p1a<'a>(
- p_ballot_num: Singleton>,
- p_is_leader: Optional>,
+fn p_leader_heartbeat<'a>(
proposers: &Cluster<'a, Proposer>,
- acceptors: &Cluster<'a, Acceptor>,
+ p_is_leader: Optional>,
+ p_ballot_num: Singleton>,
i_am_leader_send_timeout: u64, // How often to heartbeat
i_am_leader_check_timeout: u64, // How often to check if heartbeat expired
i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */
) -> (
Stream>,
- Stream>,
+ Optional