Skip to content

Commit

Permalink
refactor(paxos): simplify slot computation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 31, 2024
1 parent 0ba4265 commit 1e52942
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 287 deletions.
43 changes: 13 additions & 30 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ struct P2a<P> {

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2b<P> {
victory: bool,
ballot: Ballot,
max_ballot: Ballot,
slot: i32,
value: P,
}
Expand Down Expand Up @@ -111,7 +111,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.ballot)),
a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.max_ballot)),
a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
);

Expand Down Expand Up @@ -621,14 +621,13 @@ fn p_p2a<'a, P: PaxosPayload>(
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::<Optional<i32, _, _, _>>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
.unwrap_or(proposers.singleton_each_tick(q!(-1)))
.map(q!(|max_slot| max_slot + 1))
.unwrap_or(proposers.singleton_each_tick(q!(0)))
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
.continue_unless(p_next_slot.clone())
.map(q!(|max_slot| max_slot + 1));
.continue_unless(p_next_slot.clone());

// Send p2as
let p_indexed_payloads = c_to_proposers
.clone()
.tick_batch()
.enumerate()
.cross_singleton(p_next_slot.clone())
Expand All @@ -638,36 +637,25 @@ fn p_p2a<'a, P: PaxosPayload>(
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads)
.union(p_indexed_payloads.clone())
.continue_if(p_is_leader.clone())
.all_ticks()
.broadcast_bincode_interleaved(acceptors);

let p_num_payloads = c_to_proposers.clone().tick_batch().count();
let p_exists_payloads = p_num_payloads
.clone()
.filter(q!(|num_payloads| *num_payloads > 0));
let p_num_payloads = p_indexed_payloads.count();
let p_next_slot_after_sending_payloads = p_num_payloads
.continue_if(p_exists_payloads.clone())
.clone()
.cross_singleton(p_next_slot.clone())
.map(q!(
|(num_payloads, next_slot)| next_slot + num_payloads as i32
));
let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads);
let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs

let p_new_next_slot = p_next_slot_after_reconciling_p1bs
// .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot)))
.union(p_next_slot_after_sending_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot))))
.union(p_next_slot_if_no_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot))))
.continue_if(p_is_leader.clone());
let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all
.clone()
.continue_unless(p_new_next_slot_calculated.clone())
.map(q!(|_| 0));
// .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot)));
let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default);

p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot);
(p_next_slot, p_to_acceptors_p2a)
}
Expand Down Expand Up @@ -767,8 +755,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.map(q!(|(p2a, max_ballot)| (
p2a.ballot.proposer_id,
P2b {
victory: p2a.ballot == max_ballot,
ballot: p2a.ballot,
max_ballot,
slot: p2a.slot,
value: p2a.value
}
Expand All @@ -785,13 +773,10 @@ fn p_p2b<'a, P: PaxosPayload>(
) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
let p_p2b = a_to_proposers_p2b
.clone()
.tick_batch()
.union(p_persisted_p2bs);
let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
let p_count_matching_p2bs = p_p2b
.clone()
.filter_map(q!(|p2b| if p2b.victory {
.filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot {
// Only consider p2bs where max ballot = ballot, which means that no one preempted us
Some(((p2b.slot, p2b.ballot), p2b.value))
} else {
Expand All @@ -800,7 +785,6 @@ fn p_p2b<'a, P: PaxosPayload>(
.fold_keyed(
q!(|| (0, Default::default())),
q!(|accum, value| {
// TODO(shadaj): why is sender unused? should we de-dup?
accum.0 += 1;
accum.1 = value;
}),
Expand All @@ -816,7 +800,6 @@ fn p_p2b<'a, P: PaxosPayload>(
let p_to_replicas = p_p2b_quorum_reached
.clone()
.anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once
.map(q!(|(slot, value)| (slot, value)))
.all_ticks();

let p_p2b_all_commit_slots =
Expand Down
Loading

0 comments on commit 1e52942

Please sign in to comment.