Skip to content

Commit

Permalink
refactor(paxos): simplify slot computation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 30, 2024
1 parent ae809a7 commit e3fd984
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 279 deletions.
34 changes: 9 additions & 25 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,13 @@ fn p_p2a<'a, P: PaxosPayload>(
let (p_next_slot_complete_cycle, p_next_slot) =
proposers.tick_cycle::<Optional<i32, _, _, _>>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
.unwrap_or(proposers.singleton_each_tick(q!(-1)))
.map(q!(|max_slot| max_slot + 1))
.unwrap_or(proposers.singleton_each_tick(q!(0)))
// .inspect(q!(|max_slot| println!("{} p_max_slot: {:?}", context.current_tick(), max_slot)))
.continue_unless(p_next_slot.clone())
.map(q!(|max_slot| max_slot + 1));
.continue_unless(p_next_slot.clone());

// Send p2as
let p_indexed_payloads = c_to_proposers
.clone()
.tick_batch()
.enumerate()
.cross_singleton(p_next_slot.clone())
Expand All @@ -638,36 +637,25 @@ fn p_p2a<'a, P: PaxosPayload>(
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads)
.union(p_indexed_payloads.clone())
.continue_if(p_is_leader.clone())
.all_ticks()
.broadcast_bincode_interleaved(acceptors);

let p_num_payloads = c_to_proposers.clone().tick_batch().count();
let p_exists_payloads = p_num_payloads
.clone()
.filter(q!(|num_payloads| *num_payloads > 0));
let p_num_payloads = p_indexed_payloads.count();
let p_next_slot_after_sending_payloads = p_num_payloads
.continue_if(p_exists_payloads.clone())
.clone()
.cross_singleton(p_next_slot.clone())
.map(q!(
|(num_payloads, next_slot)| next_slot + num_payloads as i32
));
let p_next_slot_if_no_payloads = p_next_slot.clone().continue_unless(p_exists_payloads);
let p_new_next_slot_calculated = p_next_slot_after_reconciling_p1bs

let p_new_next_slot = p_next_slot_after_reconciling_p1bs
// .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot)))
.union(p_next_slot_after_sending_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot))))
.union(p_next_slot_if_no_payloads)
// .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot))))
.continue_if(p_is_leader.clone());
let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all
.clone()
.continue_unless(p_new_next_slot_calculated.clone())
.map(q!(|_| 0));
// .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot)));
let p_new_next_slot = p_new_next_slot_calculated.union(p_new_next_slot_default);

p_next_slot_complete_cycle.complete_next_tick(p_new_next_slot);
(p_next_slot, p_to_acceptors_p2a)
}
Expand Down Expand Up @@ -785,10 +773,7 @@ fn p_p2b<'a, P: PaxosPayload>(
) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
let p_p2b = a_to_proposers_p2b
.clone()
.tick_batch()
.union(p_persisted_p2bs);
let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
let p_count_matching_p2bs = p_p2b
.clone()
.filter_map(q!(|p2b| if p2b.victory {
Expand Down Expand Up @@ -816,7 +801,6 @@ fn p_p2b<'a, P: PaxosPayload>(
let p_to_replicas = p_p2b_quorum_reached
.clone()
.anti_join(p_broadcasted_p2b_slots) // Only tell the replicas about committed values once
.map(q!(|(slot, value)| (slot, value)))
.all_ticks();

let p_p2b_all_commit_slots =
Expand Down
Loading

0 comments on commit e3fd984

Please sign in to comment.