Skip to content

Commit

Permalink
refactor(paxos): use usize for slot numbers (#1521)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 5, 2024
1 parent 16b730c commit 534fe97
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 284 deletions.
18 changes: 18 additions & 0 deletions hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,22 @@ pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
}

impl RuntimeContext<'_> {
pub fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl Copy for RuntimeContext<'_> {}

impl Default for RuntimeContext<'_> {
fn default() -> Self {
Self::new()
}
}

impl<'a> FreeVariable<&'a Context> for RuntimeContext<'a> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>) {
(None, Some(quote!(&context)))
Expand All @@ -68,6 +82,10 @@ impl<ID> HfCompiled<'_, ID> {
pub fn hydroflow_ir(&self) -> &BTreeMap<usize, HydroflowGraph> {
&self.hydroflow_ir
}

pub fn take_ir(self) -> BTreeMap<usize, HydroflowGraph> {
self.hydroflow_ir
}
}

impl<'a> HfCompiled<'a, usize> {
Expand Down
45 changes: 45 additions & 0 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,23 @@ pub enum LocationId {
ExternalProcess(usize),
}

impl LocationId {
pub fn raw_id(&self) -> usize {
match self {
LocationId::Process(id) => *id,
LocationId::Cluster(id) => *id,
LocationId::ExternalProcess(id) => *id,
}
}
}

pub trait Location<'a> {
fn id(&self) -> LocationId;

fn flow_state(&self) -> &FlowState;

fn make_from(id: LocationId, flow_state: FlowState) -> Self;

fn spin(&self) -> Stream<(), Unbounded, Self>
where
Self: Sized + NoTick,
Expand Down Expand Up @@ -346,6 +358,17 @@ impl<'a, P> Location<'a> for ExternalProcess<'a, P> {
fn flow_state(&self) -> &FlowState {
&self.flow_state
}

fn make_from(id: LocationId, flow_state: FlowState) -> Self {
match id {
LocationId::ExternalProcess(id) => ExternalProcess {
id,
flow_state,
_phantom: PhantomData,
},
_ => panic!(),
}
}
}

impl<'a, P> ExternalProcess<'a, P> {
Expand Down Expand Up @@ -447,6 +470,17 @@ impl<'a, P> Location<'a> for Process<'a, P> {
fn flow_state(&self) -> &FlowState {
&self.flow_state
}

fn make_from(id: LocationId, flow_state: FlowState) -> Self {
match id {
LocationId::Process(id) => Process {
id,
flow_state,
_phantom: PhantomData,
},
_ => panic!(),
}
}
}

#[repr(transparent)]
Expand Down Expand Up @@ -581,6 +615,17 @@ impl<'a, C> Location<'a> for Cluster<'a, C> {
fn flow_state(&self) -> &FlowState {
&self.flow_state
}

fn make_from(id: LocationId, flow_state: FlowState) -> Self {
match id {
LocationId::Cluster(id) => Cluster {
id,
flow_state,
_phantom: PhantomData,
},
_ => panic!(),
}
}
}

pub trait CanSend<'a, To: Location<'a>>: Location<'a> {
Expand Down
28 changes: 28 additions & 0 deletions hydroflow_plus/src/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,15 @@ impl<'a, T, W, N: Location<'a>> Singleton<T, W, N> {
}

impl<'a, T, N: Location<'a>> Singleton<T, Bounded, Tick<N>> {
// TODO(shadaj): this is technically incorrect; we should only return the first element of the stream
pub fn into_stream(self) -> Stream<T, Bounded, Tick<N>> {
Stream::new(
self.location_kind,
self.flow_state,
self.ir_node.into_inner(),
)
}

pub fn cross_singleton<Other>(self, other: Other) -> <Self as CrossResult<'a, Other>>::Out
where
Self: CrossResult<'a, Other>,
Expand Down Expand Up @@ -641,6 +650,16 @@ impl<'a, T, N: Location<'a>> Optional<T, Bounded, Tick<N>> {
),
)
}

pub fn into_singleton(self) -> Singleton<Option<T>, Bounded, Tick<N>>
where
T: Clone,
N: NoTick,
{
let self_location = N::make_from(self.location_kind, self.flow_state.clone());
self.map(q!(|v| Some(v)))
.unwrap_or(self_location.singleton_each_tick(q!(None)))
}
}

impl<'a, T, N: Location<'a>> Optional<T, Bounded, Tick<N>> {
Expand Down Expand Up @@ -730,6 +749,15 @@ impl<'a, T, B, N: Location<'a> + NoTick> Optional<T, B, N> {

self.latest_tick().unwrap_or(other.latest_tick()).latest()
}

pub fn into_singleton(self) -> Singleton<Option<T>, Unbounded, N>
where
T: Clone,
{
let self_location = N::make_from(self.location_kind, self.flow_state.clone());
self.map(q!(|v| Some(v)))
.unwrap_or(self_location.singleton(q!(None)))
}
}

impl<'a, T, N: Location<'a> + NoTick> Optional<T, Unbounded, N> {
Expand Down
6 changes: 6 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ impl<'a, L: Location<'a>> Location<'a> for Tick<L> {
fn flow_state(&self) -> &FlowState {
self.l.flow_state()
}

fn make_from(id: LocationId, flow_state: FlowState) -> Self {
Tick {
l: L::make_from(id, flow_state),
}
}
}

/// An infinite stream of elements of type `T`.
Expand Down
52 changes: 24 additions & 28 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ struct P1b<L> {
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2a<P> {
ballot: Ballot,
slot: i32,
slot: usize,
value: Option<P>, // might be a re-committed hole
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2b<P> {
ballot: Ballot,
max_ballot: Ballot,
slot: i32,
slot: usize,
value: Option<P>, // might be a hole
}

Expand All @@ -62,15 +62,15 @@ struct P2b<P> {
pub fn paxos_core<'a, P: PaxosPayload, R>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, i32), Unbounded, Cluster<'a, Acceptor>>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, usize), Unbounded, Cluster<'a, Acceptor>>,
c_to_proposers: Stream<P, Unbounded, Cluster<'a, Proposer>>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
) -> (
Stream<(), Unbounded, Cluster<'a, Proposer>>,
Stream<(i32, Option<P>), Unbounded, Cluster<'a, Proposer>>,
Stream<(usize, Option<P>), Unbounded, Cluster<'a, Proposer>>,
) {
proposers
.source_iter(q!(["Proposers say hello"]))
Expand All @@ -83,7 +83,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) =
proposers.forward_ref::<Stream<P2b<P>, _, _>>();
let (a_log_complete_cycle, a_log_forward_reference) =
acceptors.tick_forward_ref::<Singleton<(i32, HashMap<i32, LogValue<P>>), _, _>>();
acceptors
.tick_forward_ref::<Singleton<(Option<usize>, HashMap<usize, LogValue<P>>), _, _>>();

let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
proposers,
Expand Down Expand Up @@ -437,12 +438,12 @@ fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn recommit_after_leader_election<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
p_relevant_p1bs: Stream<P1b<HashMap<i32, LogValue<P>>>, Bounded, Tick<Cluster<'a, Proposer>>>,
p_relevant_p1bs: Stream<P1b<HashMap<usize, LogValue<P>>>, Bounded, Tick<Cluster<'a, Proposer>>>,
p_ballot_num: Singleton<u32, Bounded, Tick<Cluster<'a, Proposer>>>,
f: usize,
) -> (
Stream<P2a<P>, Bounded, Tick<Cluster<'a, Proposer>>>,
Optional<i32, Bounded, Tick<Cluster<'a, Proposer>>>,
Optional<usize, Bounded, Tick<Cluster<'a, Proposer>>>,
Stream<P2a<P>, Bounded, Tick<Cluster<'a, Proposer>>>,
) {
let p_id = proposers.self_id();
Expand Down Expand Up @@ -520,19 +521,19 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
c_to_proposers: Stream<P, Unbounded, Cluster<'a, Proposer>>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, i32), Unbounded, Cluster<'a, Acceptor>>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, usize), Unbounded, Cluster<'a, Acceptor>>,

p_ballot_num: Singleton<u32, Bounded, Tick<Cluster<'a, Proposer>>>,
p_is_leader: Optional<bool, Bounded, Tick<Cluster<'a, Proposer>>>,
p_max_slot: Optional<i32, Bounded, Tick<Cluster<'a, Proposer>>>,
p_max_slot: Optional<usize, Bounded, Tick<Cluster<'a, Proposer>>>,

p_log_to_recommit: Stream<P2a<P>, Bounded, Tick<Cluster<'a, Proposer>>>,
f: usize,

a_max_ballot: Singleton<Ballot, Bounded, Tick<Cluster<'a, Acceptor>>>,
) -> (
Stream<(i32, Option<P>), Unbounded, Cluster<'a, Proposer>>,
Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick<Cluster<'a, Acceptor>>>,
Stream<(usize, Option<P>), Unbounded, Cluster<'a, Proposer>>,
Singleton<(Option<usize>, HashMap<usize, LogValue<P>>), Bounded, Tick<Cluster<'a, Acceptor>>>,
Stream<P2b<P>, Unbounded, Cluster<'a, Proposer>>,
) {
let p_to_acceptors_p2a = p_p2a(
Expand All @@ -552,7 +553,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
p_to_acceptors_p2a,
r_to_acceptors_checkpoint,
proposers,
acceptors,
f,
);

Expand All @@ -563,22 +563,22 @@ fn sequence_payload<'a, P: PaxosPayload, R>(

#[derive(Clone)]
enum CheckpointOrP2a<P> {
Checkpoint(i32),
Checkpoint(usize),
P2a(P2a<P>),
}

// Proposer logic to send p2as, outputting the next slot and the p2as to send to acceptors.
fn p_p2a<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
p_max_slot: Optional<i32, Bounded, Tick<Cluster<'a, Proposer>>>,
p_max_slot: Optional<usize, Bounded, Tick<Cluster<'a, Proposer>>>,
c_to_proposers: Stream<P, Unbounded, Cluster<'a, Proposer>>,
p_ballot_num: Singleton<u32, Bounded, Tick<Cluster<'a, Proposer>>>,
p_log_to_recommit: Stream<P2a<P>, Bounded, Tick<Cluster<'a, Proposer>>>,
p_is_leader: Optional<bool, Bounded, Tick<Cluster<'a, Proposer>>>,
acceptors: &Cluster<'a, Acceptor>,
) -> Stream<P2a<P>, Unbounded, Cluster<'a, Acceptor>> {
let p_id = proposers.self_id();
let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::<Optional<i32, _, _>>();
let (p_next_slot_complete_cycle, p_next_slot) = proposers.tick_cycle::<Optional<usize, _, _>>();
let p_next_slot_after_reconciling_p1bs = p_max_slot
.map(q!(|max_slot| max_slot + 1))
.unwrap_or(proposers.singleton_each_tick(q!(0)))
Expand All @@ -595,7 +595,7 @@ fn p_p2a<'a, P: PaxosPayload>(
// .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num))))
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a {
ballot: Ballot { num: ballot_num, proposer_id: p_id },
slot: next_slot + index as i32,
slot: next_slot + index,
value: Some(payload)
}));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
Expand All @@ -609,9 +609,7 @@ fn p_p2a<'a, P: PaxosPayload>(
let p_next_slot_after_sending_payloads = p_num_payloads
.clone()
.cross_singleton(p_next_slot.clone())
.map(q!(
|(num_payloads, next_slot)| next_slot + num_payloads as i32
));
.map(q!(|(num_payloads, next_slot)| next_slot + num_payloads));

let p_new_next_slot = p_next_slot_after_reconciling_p1bs
// .inspect(q!(|slot| println!("{} p_new_next_slot_after_reconciling_p1bs: {:?}", context.current_tick(), slot)))
Expand All @@ -627,12 +625,11 @@ fn p_p2a<'a, P: PaxosPayload>(
fn acceptor_p2<'a, P: PaxosPayload, R>(
a_max_ballot: Singleton<Ballot, Bounded, Tick<Cluster<'a, Acceptor>>>,
p_to_acceptors_p2a: Stream<P2a<P>, Unbounded, Cluster<'a, Acceptor>>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, i32), Unbounded, Cluster<'a, Acceptor>>,
r_to_acceptors_checkpoint: Stream<(ClusterId<R>, usize), Unbounded, Cluster<'a, Acceptor>>,
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
f: usize,
) -> (
Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick<Cluster<'a, Acceptor>>>,
Singleton<(Option<usize>, HashMap<usize, LogValue<P>>), Bounded, Tick<Cluster<'a, Acceptor>>>,
Stream<P2b<P>, Unbounded, Cluster<'a, Proposer>>,
) {
let p_to_acceptors_p2a_batch = p_to_acceptors_p2a.tick_batch();
Expand All @@ -658,7 +655,6 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.continue_if(a_checkpoints_quorum_reached)
.map(q!(|(_sender, seq)| seq))
.min()
.unwrap_or(acceptors.singleton_each_tick(q!(-1)))
.delta()
.map(q!(|min_seq| CheckpointOrP2a::Checkpoint(min_seq)));
// .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq)));
Expand All @@ -677,19 +673,19 @@ fn acceptor_p2<'a, P: PaxosPayload, R>(
.union(a_new_checkpoint.into_stream())
.persist()
.fold(
q!(|| (-1, HashMap::new())),
q!(|| (None, HashMap::new())),
q!(|(prev_checkpoint, log), checkpoint_or_p2a| {
match checkpoint_or_p2a {
CheckpointOrP2a::Checkpoint(new_checkpoint) => {
// This is a checkpoint message. Delete all entries up to the checkpoint
for slot in *prev_checkpoint..new_checkpoint {
for slot in (prev_checkpoint.unwrap_or(0))..new_checkpoint {
log.remove(&slot);
}
*prev_checkpoint = new_checkpoint;
*prev_checkpoint = Some(new_checkpoint);
}
CheckpointOrP2a::P2a(p2a) => {
// This is a regular p2a message. Insert it into the log if it is not checkpointed and has a higher ballot than what was there before
if p2a.slot > *prev_checkpoint
if prev_checkpoint.map(|prev| p2a.slot > prev).unwrap_or(true)
&& log
.get(&p2a.slot)
.map(|prev_p2a: &LogValue<_>| p2a.ballot > prev_p2a.ballot)
Expand Down Expand Up @@ -728,7 +724,7 @@ fn p_p2b<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
a_to_proposers_p2b: Stream<P2b<P>, Unbounded, Cluster<'a, Proposer>>,
f: usize,
) -> Stream<(i32, Option<P>), Unbounded, Cluster<'a, Proposer>> {
) -> Stream<(usize, Option<P>), Unbounded, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
Expand Down
Loading

0 comments on commit 534fe97

Please sign in to comment.