Skip to content

Commit

Permalink
fix(paxos): be more careful about which parts of proposer and accepto…
Browse files Browse the repository at this point in the history
…r have to be maintained atomically
  • Loading branch information
shadaj committed Oct 30, 2024
1 parent 9e75d82 commit b459ec9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 126 deletions.
10 changes: 10 additions & 0 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ pub trait Location<'a> {
)
}

fn singleton_each_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
) -> Singleton<T, Bounded, Tick, Self>
where
Self: Sized,
{
self.singleton(e).latest_tick()
}

fn singleton_first_tick<T: Clone>(
&self,
e: impl Quoted<'a, T>,
Expand Down
87 changes: 47 additions & 40 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn leader_election<'a, P: PaxosPayload>(
Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Acceptor>>,
Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
) {
let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b) =
proposers.forward_ref::<Stream<_, _, _, _>>();
Expand Down Expand Up @@ -192,17 +192,8 @@ fn leader_election<'a, P: PaxosPayload>(
);
p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader_from_others);

let a_max_ballot = p_to_acceptors_p1a
.clone()
.inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
.map(q!(|p1a| p1a.ballot))
.max()
.unwrap_or(acceptors.singleton(q!(Ballot {
num: 0,
proposer_id: ClusterId::from_raw(0)
})));

let a_to_proposers_p1b = acceptor_p1(p_to_acceptors_p1a, &a_max_ballot, a_log, proposers);
let (a_max_ballot, a_to_proposers_p1b) =
acceptor_p1(acceptors, p_to_acceptors_p1a, a_log, proposers);
a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone());

let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b(
Expand Down Expand Up @@ -254,7 +245,7 @@ fn p_ballot_calc<'a>(
) {
let p_id = proposers.self_id();
let (p_ballot_num_complete_cycle, p_ballot_num) =
proposers.tick_cycle_with_initial(proposers.singleton(q!(0)).latest_tick());
proposers.tick_cycle_with_initial(proposers.singleton_each_tick(q!(0)));

let p_new_ballot_num = p_received_max_ballot
.clone()
Expand Down Expand Up @@ -302,7 +293,6 @@ fn p_leader_expired<'a>(
);

p_latest_received_i_am_leader
.clone()
.latest_tick()
.continue_unless(p_is_leader)
.filter(q!(move |latest_received_i_am_leader| {
Expand Down Expand Up @@ -376,25 +366,42 @@ fn p_p1a<'a>(

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn acceptor_p1<'a, P: PaxosPayload>(
acceptors: &Cluster<'a, Acceptor>,
p_to_acceptors_p1a: Stream<P1a, Unbounded, NoTick, Cluster<'a, Acceptor>>,
a_max_ballot: &Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Acceptor>>,
a_log: Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
proposers: &Cluster<'a, Proposer>,
) -> Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>> {
p_to_acceptors_p1a
.tick_batch()
.cross_singleton(a_max_ballot.clone().latest_tick())
.cross_singleton(a_log)
.map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| (
p1a.ballot.proposer_id,
P1b {
ballot: p1a.ballot,
max_ballot,
accepted: log
}
)))
.all_ticks()
.send_bincode_interleaved(proposers)
) -> (
Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch();
let a_max_ballot = p_to_acceptors_p1a
.clone()
.inspect(q!(|p1a| println!("Acceptor received P1a: {:?}", p1a)))
.persist()
.map(q!(|p1a| p1a.ballot))
.max()
.unwrap_or(acceptors.singleton_each_tick(q!(Ballot {
num: 0,
proposer_id: ClusterId::from_raw(0)
})));

(
a_max_ballot.clone(),
p_to_acceptors_p1a
.cross_singleton(a_max_ballot)
.cross_singleton(a_log)
.map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| (
p1a.ballot.proposer_id,
P1b {
ballot: p1a.ballot,
max_ballot,
accepted: log
}
)))
.all_ticks()
.send_bincode_interleaved(proposers),
)
}

// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
Expand Down Expand Up @@ -515,7 +522,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
p_log_holes: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
f: usize,

a_max_ballot: Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Acceptor>>,
a_max_ballot: Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
) -> (
Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
Expand Down Expand Up @@ -597,7 +604,7 @@ fn p_p2a<'a, P: PaxosPayload>(
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::<Optional<i32, _, _, _>>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
.unwrap_or(proposers.singleton(q!(-1)).latest_tick())
.unwrap_or(proposers.singleton_each_tick(q!(-1)))
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
.continue_unless(p_next_slot.clone())
.map(q!(|max_slot| max_slot + 1));
Expand Down Expand Up @@ -652,7 +659,7 @@ fn p_p2a<'a, P: PaxosPayload>(

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn acceptor_p2<'a, P: PaxosPayload, R>(
a_max_ballot: Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Acceptor>>,
a_max_ballot: Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
p_to_acceptors_p2a: Stream<P2a<P>, Unbounded, NoTick, Cluster<'a, Acceptor>>,
r_to_acceptors_checkpoint: Stream<
(ClusterId<R>, i32),
Expand All @@ -667,6 +674,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch();

// Get the latest checkpoint sequence per replica
let a_checkpoint_largest_seqs =
r_to_acceptors_checkpoint
Expand All @@ -688,15 +697,14 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.continue_if(a_checkpoints_quorum_reached)
.map(q!(|(_sender, seq)| seq))
.min()
.unwrap_or(acceptors.singleton(q!(-1)).latest_tick())
.unwrap_or(acceptors.singleton_each_tick(q!(-1)))
.delta()
.map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq)));
// .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq)));

let a_p2as_to_place_in_log = p_to_acceptors_p2a
let a_p2as_to_place_in_log = p_to_acceptors_p2a_batch
.clone()
.tick_batch()
.cross_singleton(a_max_ballot.clone().latest_tick()) // Don't consider p2as if the current ballot is higher
.cross_singleton(a_max_ballot.clone()) // Don't consider p2as if the current ballot is higher
.filter_map(q!(|(p2a, max_ballot)|
if p2a.ballot >= max_ballot {
Some(CheckpointOrP2a::P2a(p2a))
Expand Down Expand Up @@ -739,9 +747,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
}),
);

let a_to_proposers_p2b = p_to_acceptors_p2a
.tick_batch()
.cross_singleton(a_max_ballot.clone().latest_tick())
let a_to_proposers_p2b = p_to_acceptors_p2a_batch
.cross_singleton(a_max_ballot)
.map(q!(|(p2a, max_ballot)| (
p2a.ballot.proposer_id,
P2b {
Expand Down
Loading

0 comments on commit b459ec9

Please sign in to comment.