Skip to content

Commit

Permalink
refactor(hydroflow_plus, paxos)!: simplify intervals and split Paxos-…
Browse files Browse the repository at this point in the history
…KV into separate module
  • Loading branch information
shadaj committed Nov 1, 2024
1 parent 70ba22b commit 1b5eb56
Show file tree
Hide file tree
Showing 6 changed files with 516 additions and 517 deletions.
20 changes: 5 additions & 15 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,36 +148,26 @@ pub trait Location<'a> {
fn source_interval(
&self,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Optional<(), Unbounded, NoTick, Self>
) -> Stream<tokio::time::Instant, Unbounded, NoTick, Self>
where
Self: Sized,
{
let interval = interval.splice_untyped();

Optional::new(
self.id(),
self.flow_state().clone(),
HfPlusNode::Persist(Box::new(HfPlusNode::Source {
source: HfPlusSource::Interval(interval.into()),
location_kind: self.id(),
})),
)
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(interval)
)))
}

fn source_interval_delayed(
&self,
delay: impl Quoted<'a, Duration> + Copy + 'a,
interval: impl Quoted<'a, Duration> + Copy + 'a,
) -> Optional<tokio::time::Instant, Unbounded, NoTick, Self>
) -> Stream<tokio::time::Instant, Unbounded, NoTick, Self>
where
Self: Sized,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)))
.tick_batch()
.first()
.latest()
}

fn forward_ref<S: CycleCollection<'a, NoTick, Location = Self>>(
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus_test/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod many_to_many;
pub mod map_reduce;
pub mod paxos;
pub mod paxos_bench;
pub mod paxos_kv;
pub mod simple_cluster;
pub mod two_pc;
50 changes: 18 additions & 32 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,29 @@ struct P2b<P> {
value: P,
}

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
#[expect(
clippy::too_many_arguments,
clippy::type_complexity,
reason = "internal paxos code // TODO"
)]
pub fn paxos_core<'a, P: PaxosPayload, R>(
flow: &FlowBuilder<'a>,
r_to_acceptors_checkpoint: impl FnOnce(
&Cluster<'a, Acceptor>,
) -> Stream<
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
r_to_acceptors_checkpoint: Stream<
(ClusterId<R>, i32),
Unbounded,
NoTick,
Cluster<'a, Acceptor>,
>,
c_to_proposers: impl FnOnce(
&Cluster<'a, Proposer>,
) -> Stream<P, Unbounded, NoTick, Cluster<'a, Proposer>>,
c_to_proposers: Stream<P, Unbounded, NoTick, Cluster<'a, Proposer>>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
) -> (
Cluster<'a, Proposer>,
Cluster<'a, Acceptor>,
Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let proposers = flow.cluster::<Proposer>();
let acceptors = flow.cluster::<Acceptor>();

proposers
.source_iter(q!(["Proposers say hello"]))
.for_each(q!(|s| println!("{}", s)));
Expand All @@ -97,8 +93,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
acceptors.tick_forward_ref::<Singleton<(i32, HashMap<i32, LogValue<P>>), _, _, _>>();

let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
&proposers,
&acceptors,
proposers,
acceptors,
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
Expand All @@ -118,17 +114,15 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
.all_ticks();

let (p_log_to_try_commit, p_max_slot, p_log_holes) =
recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f);
recommit_after_leader_election(proposers, p_relevant_p1bs, p_ballot_num.clone(), f);

let p_log_to_recommit = p_log_to_try_commit
.union(p_log_holes)
.continue_if(just_became_leader); // Only resend p1b stuff once the moment we become leader.

let c_to_proposers = c_to_proposers(&proposers);

let (p_to_replicas, a_log, a_to_proposers_p2b) = sequence_payload(
&proposers,
&acceptors,
proposers,
acceptors,
c_to_proposers,
r_to_acceptors_checkpoint,
p_ballot_num,
Expand All @@ -142,12 +136,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
a_log_complete_cycle.complete(a_log);
a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b);

(
proposers,
acceptors,
p_to_clients_new_leader_elected,
p_to_replicas,
)
(p_to_clients_new_leader_elected, p_to_replicas)
}

#[expect(
Expand Down Expand Up @@ -350,7 +339,8 @@ fn p_leader_heartbeat<'a>(
)),
q!(Duration::from_secs(i_am_leader_check_timeout)),
)
.latest_tick(),
.tick_batch()
.first(),
);
(p_to_proposers_i_am_leader, p_trigger_election)
}
Expand Down Expand Up @@ -532,9 +522,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
c_to_proposers: Stream<P, Unbounded, NoTick, Cluster<'a, Proposer>>,
r_to_acceptors_checkpoint: impl FnOnce(
&Cluster<'a, Acceptor>,
) -> Stream<
r_to_acceptors_checkpoint: Stream<
(ClusterId<R>, i32),
Unbounded,
NoTick,
Expand Down Expand Up @@ -565,8 +553,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
);

// Acceptors.
let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors);

// p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a)));
let (a_log, a_to_proposers_p2b) = acceptor_p2(
a_max_ballot.clone(),
Expand Down
Loading

0 comments on commit 1b5eb56

Please sign in to comment.