Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(paxos): simplify slot computation logic #1493

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ struct P2a<P> {

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2b<P> {
victory: bool,
ballot: Ballot,
max_ballot: Ballot,
slot: i32,
value: P,
}
Expand Down Expand Up @@ -111,7 +111,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.ballot)),
a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.max_ballot)),
a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
);

Expand Down Expand Up @@ -621,14 +621,13 @@ fn p_p2a<'a, P: PaxosPayload>(
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::<Optional<i32, _, _, _>>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
.unwrap_or(proposers.singleton_each_tick(q!(-1)))
.map(q!(|max_slot| max_slot + 1))
.unwrap_or(proposers.singleton_each_tick(q!(0)))
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
.continue_unless(p_next_slot.clone())
.map(q!(|max_slot| max_slot + 1));
.continue_unless(p_next_slot.clone());

// Send p2as
let p_indexed_payloads = c_to_proposers
.clone()
.tick_batch()
.enumerate()
.cross_singleton(p_next_slot.clone())
Expand All @@ -638,36 +637,25 @@ fn p_p2a<'a, P: PaxosPayload>(
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads)
.union(p_indexed_payloads.clone())
.continue_if(p_is_leader.clone())
.all_ticks()
.broadcast_bincode_interleaved(acceptors);

let p_num_payloads = c_to_proposers.clone().tick_batch().count();
let p_exists_payloads = p_num_payloads
.clone()
.filter(q!(|num_payloads| *num_payloads > 0));
let p_num_payloads = p_indexed_payloads.count();
let p_next_slot_after_sending_payloads = p_num_payloads
.continue_if(p_exists_payloads.clone())
.clone()
.cross_singleton(p_next_slot.clone())
.map(q!(
|(num_payloads, next_slot)| next_slot + num_payloads as i32
));
let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads);
let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs

let p_new_next_slot = p_next_slot_after_reconciling_p1bs
// .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot)))
.union(p_next_slot_after_sending_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot))))
.union(p_next_slot_if_no_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot))))
.continue_if(p_is_leader.clone());
let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all
.clone()
.continue_unless(p_new_next_slot_calculated.clone())
.map(q!(|_| 0));
// .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot)));
let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default);

p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot);
(p_next_slot, p_to_acceptors_p2a)
}
Expand Down Expand Up @@ -767,8 +755,8 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.map(q!(|(p2a, max_ballot)| (
p2a.ballot.proposer_id,
P2b {
victory: p2a.ballot == max_ballot,
ballot: p2a.ballot,
max_ballot,
slot: p2a.slot,
value: p2a.value
}
Expand All @@ -785,13 +773,10 @@ fn p_p2b<'a, P: PaxosPayload>(
) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
let p_p2b = a_to_proposers_p2b
.clone()
.tick_batch()
.union(p_persisted_p2bs);
let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
let p_count_matching_p2bs = p_p2b
.clone()
.filter_map(q!(|p2b| if p2b.victory {
.filter_map(q!(|p2b| if p2b.ballot == p2b.max_ballot {
// Only consider p2bs where max ballot = ballot, which means that no one preempted us
Some(((p2b.slot, p2b.ballot), p2b.value))
} else {
Expand All @@ -800,7 +785,6 @@ fn p_p2b<'a, P: PaxosPayload>(
.fold_keyed(
q!(|| (0, Default::default())),
q!(|accum, value| {
// TODO(shadaj): why is sender unused? should we de-dup?
accum.0 += 1;
accum.1 = value;
}),
Expand All @@ -816,7 +800,6 @@ fn p_p2b<'a, P: PaxosPayload>(
let p_to_replicas = p_p2b_quorum_reached
.clone()
.anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once
.map(q!(|(slot, value)| (slot, value)))
.all_ticks();

let p_p2b_all_commit_slots =
Expand Down
Loading
Loading