Skip to content

Commit

Permalink
refactor(paxos): make Paxos-KV generic (#1517)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Nov 3, 2024
1 parent e5b456b commit 0a5abab
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 285 deletions.
8 changes: 8 additions & 0 deletions hydroflow_plus/src/persist_pullup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ fn persist_pullup_node(
input: behind_persist,
})),

HfPlusNode::FilterMap {
f,
input: box HfPlusNode::Persist(behind_persist),
} => HfPlusNode::Persist(Box::new(HfPlusNode::FilterMap {
f,
input: behind_persist,
})),

HfPlusNode::FlatMap {
f,
input: box HfPlusNode::Persist(behind_persist),
Expand Down
73 changes: 40 additions & 33 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ use tokio::time::Instant;
pub struct Proposer {}
pub struct Acceptor {}

pub trait PaxosPayload:
Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + Debug
{
}
pub trait PaxosPayload: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug {}
impl<T: Serialize + DeserializeOwned + PartialEq + Eq + Clone + Debug> PaxosPayload for T {}

#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Hash)]
pub struct Ballot {
Expand All @@ -31,7 +29,7 @@ struct P1a {
#[derive(Serialize, Deserialize, Clone, Debug)]
struct LogValue<P> {
ballot: Ballot,
value: P,
value: Option<P>, // might be a hole
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand All @@ -45,15 +43,15 @@ struct P1b<L> {
struct P2a<P> {
ballot: Ballot,
slot: i32,
value: P,
value: Option<P>, // might be a re-committed hole
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
struct P2b<P> {
ballot: Ballot,
max_ballot: Ballot,
slot: i32,
value: P,
value: Option<P>, // might be a hole
}

#[expect(
Expand All @@ -77,7 +75,7 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
i_am_leader_check_timeout_delay_multiplier: usize,
) -> (
Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, Option<P>), Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
proposers
.source_iter(q!(["Proposers say hello"]))
Expand Down Expand Up @@ -456,28 +454,33 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(

let p_p1b_highest_entries_and_count = p_relevant_p1bs
.flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream
.fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| {
let same_values = new_entry.value == curr_entry.1.value;
let higher_ballot = new_entry.ballot > curr_entry.1.ballot;
// Increment count if the values are the same
if same_values {
curr_entry.0 += 1;
}
// Replace the ballot with the largest one
if higher_ballot {
curr_entry.1.ballot = new_entry.ballot;
// Replace the value with the one from the largest ballot, if necessary
if !same_values {
curr_entry.0 = 1;
curr_entry.1.value = new_entry.value;
.fold_keyed::<(usize, Option<LogValue<P>>), _, _>(q!(|| (0, None)), q!(|curr_entry, new_entry| {
if let Some(curr_entry_payload) = &mut curr_entry.1 {
let same_values = new_entry.value == curr_entry_payload.value;
let higher_ballot = new_entry.ballot > curr_entry_payload.ballot;
// Increment count if the values are the same
if same_values {
curr_entry.0 += 1;
}
// Replace the ballot with the largest one
if higher_ballot {
curr_entry_payload.ballot = new_entry.ballot;
// Replace the value with the one from the largest ballot, if necessary
if !same_values {
curr_entry.0 = 1;
curr_entry_payload.value = new_entry.value;
}
}
} else {
*curr_entry = (1, Some(new_entry));
}
}));
let p_log_to_try_commit = p_p1b_highest_entries_and_count
.clone()
.cross_singleton(p_ballot_num.clone())
.filter_map(q!(
move |((slot, (count, entry)), ballot_num)| if count <= f as u32 {
.filter_map(q!(move |((slot, (count, entry)), ballot_num)| {
let entry = entry.unwrap();
if count <= f {
Some(P2a {
ballot: Ballot {
num: ballot_num,
Expand All @@ -489,7 +492,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
} else {
None
}
));
}));
let p_max_slot = p_p1b_highest_entries_and_count
.clone()
.map(q!(|(slot, _)| slot))
Expand All @@ -508,7 +511,7 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
proposer_id: p_id
},
slot,
value: Default::default()
value: None
}));
(p_log_to_try_commit, p_max_slot, p_log_holes)
}
Expand Down Expand Up @@ -538,7 +541,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(

a_max_ballot: Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
) -> (
Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<(i32, Option<P>), Unbounded, NoTick, Cluster<'a, Proposer>>,
Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
Expand All @@ -563,7 +566,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
f,
);

// End tell clients that leader election has completed
let p_to_replicas = p_p2b(proposers, a_to_proposers_p2b.clone(), f);

(p_to_replicas, a_log, a_to_proposers_p2b)
Expand Down Expand Up @@ -602,7 +604,11 @@ fn p_p2a<'a, P: PaxosPayload>(
// .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next))))
.cross_singleton(p_ballot_num.clone())
// .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num))))
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a {
ballot: Ballot { num: ballot_num, proposer_id: p_id },
slot: next_slot + index as i32,
value: Some(payload)
}));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads.clone())
Expand Down Expand Up @@ -738,7 +744,7 @@ fn p_p2b<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
a_to_proposers_p2b: Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
f: usize,
) -> Stream<(i32, P), Unbounded, NoTick, Cluster<'a, Proposer>> {
) -> Stream<(i32, Option<P>), Unbounded, NoTick, Cluster<'a, Proposer>> {
let (p_broadcasted_p2b_slots_complete_cycle, p_broadcasted_p2b_slots) = proposers.tick_cycle();
let (p_persisted_p2bs_complete_cycle, p_persisted_p2bs) = proposers.tick_cycle();
let p_p2b = a_to_proposers_p2b.tick_batch().union(p_persisted_p2bs);
Expand All @@ -751,12 +757,13 @@ fn p_p2b<'a, P: PaxosPayload>(
None
}))
.fold_keyed(
q!(|| (0, Default::default())),
q!(|| (0, None)),
q!(|accum, value| {
accum.0 += 1;
accum.1 = value;
accum.1 = Some(value);
}),
);
)
.map(q!(|(k, (count, v))| (k, (count, v.unwrap()))));
let p_p2b_quorum_reached =
p_count_matching_p2bs
.clone()
Expand Down
25 changes: 12 additions & 13 deletions hydroflow_plus_test/src/cluster/paxos_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use hydroflow_plus::*;
use stageleft::*;

use super::paxos::{Acceptor, Proposer};
use super::paxos_kv::{paxos_kv, KvPayload, Replica, SequencedKv};
use super::paxos_kv::{paxos_kv, KvPayload, Replica};

pub struct Client {}

Expand Down Expand Up @@ -58,10 +58,7 @@ pub fn paxos_bench<'a>(
.map(q!(|(leader_id, _)| leader_id)),
);
processed_payloads
.map(q!(|payload| (
ClusterId::from_raw(payload.kv.value.parse::<u32>().unwrap()),
payload
)))
.map(q!(|payload| (payload.value, payload)))
.send_bincode(&clients)
},
num_clients_per_node,
Expand All @@ -82,9 +79,14 @@ fn bench_client<'a>(
Cluster<'a, Client>,
>,
transaction_cycle: impl FnOnce(
Stream<(ClusterId<Proposer>, KvPayload), Unbounded, NoTick, Cluster<'a, Client>>,
Stream<
(ClusterId<Proposer>, KvPayload<u32, ClusterId<Client>>),
Unbounded,
NoTick,
Cluster<'a, Client>,
>,
) -> Stream<
(ClusterId<Replica>, SequencedKv),
(ClusterId<Replica>, KvPayload<u32, ClusterId<Client>>),
Unbounded,
NoTick,
Cluster<'a, Client>,
Expand Down Expand Up @@ -112,7 +114,7 @@ fn bench_client<'a>(
leader_ballot,
KvPayload {
key: i as u32,
value: c_id.raw_id.to_string()
value: c_id
}
)
)));
Expand All @@ -126,7 +128,7 @@ fn bench_client<'a>(
let c_received_payloads = transaction_results
.tick_batch()
.map(q!(|(sender, replica_payload)| (
replica_payload.kv.key,
replica_payload.key,
sender
)))
.union(c_pending_quorum_payloads);
Expand Down Expand Up @@ -154,10 +156,7 @@ fn bench_client<'a>(
.cross_singleton(current_leader.clone().latest_tick())
.map(q!(move |(key, cur_leader)| (
cur_leader,
KvPayload {
key,
value: c_id.raw_id.to_string()
}
KvPayload { key, value: c_id }
)));
c_to_proposers_complete_cycle.complete(
c_new_payloads_when_leader_elected
Expand Down
72 changes: 42 additions & 30 deletions hydroflow_plus_test/src/cluster/paxos_kv.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,65 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;

use hydroflow_plus::*;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use stageleft::*;

use super::paxos::{paxos_core, Acceptor, PaxosPayload, Proposer};
use super::paxos::{paxos_core, Acceptor, Proposer};

pub struct Replica {}

#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
pub struct KvPayload {
pub key: u32,
pub value: String,
}
pub trait KvKey: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug {}
impl<K: Serialize + DeserializeOwned + Hash + Eq + Clone + Debug> KvKey for K {}

impl Default for KvPayload {
fn default() -> Self {
Self {
key: 0,
value: "".to_string(),
}
}
}
pub trait KvValue: Serialize + DeserializeOwned + Eq + Clone + Debug {}
impl<V: Serialize + DeserializeOwned + Eq + Clone + Debug> KvValue for V {}

impl PaxosPayload for KvPayload {}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct KvPayload<K, V> {
pub key: K,
pub value: V,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
pub struct SequencedKv {
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct SequencedKv<K, V> {
// Note: Important that seq is the first member of the struct for sorting
pub seq: i32,
pub kv: KvPayload,
pub kv: Option<KvPayload<K, V>>,
}

impl<K: KvKey, V: KvValue> Ord for SequencedKv<K, V> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.seq.cmp(&other.seq)
}
}

impl<K: KvKey, V: KvValue> PartialOrd for SequencedKv<K, V> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

#[expect(
clippy::type_complexity,
clippy::too_many_arguments,
reason = "internal paxos code // TODO"
)]
pub fn paxos_kv<'a>(
pub fn paxos_kv<'a, K: KvKey, V: KvValue>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
replicas: &Cluster<'a, Replica>,
c_to_proposers: Stream<KvPayload, Unbounded, NoTick, Cluster<'a, Proposer>>,
c_to_proposers: Stream<KvPayload<K, V>, Unbounded, NoTick, Cluster<'a, Proposer>>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
checkpoint_frequency: usize,
) -> (
Stream<(), Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<SequencedKv, Unbounded, NoTick, Cluster<'a, Replica>>,
Stream<KvPayload<K, V>, Unbounded, NoTick, Cluster<'a, Replica>>,
) {
let (r_to_acceptors_checkpoint_complete_cycle, r_to_acceptors_checkpoint) =
replicas.forward_ref::<Stream<_, _, _, _>>();
Expand Down Expand Up @@ -80,13 +90,13 @@ pub fn paxos_kv<'a>(

// Replicas. All relations for replicas will be prefixed with r. Expects ReplicaPayload on p_to_replicas, outputs a stream of (client address, ReplicaPayload) after processing.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
pub fn replica<'a>(
pub fn replica<'a, K: KvKey, V: KvValue>(
replicas: &Cluster<'a, Replica>,
p_to_replicas: Stream<SequencedKv, Unbounded, NoTick, Cluster<'a, Replica>>,
p_to_replicas: Stream<SequencedKv<K, V>, Unbounded, NoTick, Cluster<'a, Replica>>,
checkpoint_frequency: usize,
) -> (
Stream<i32, Unbounded, NoTick, Cluster<'a, Replica>>,
Stream<SequencedKv, Unbounded, NoTick, Cluster<'a, Replica>>,
Stream<KvPayload<K, V>, Unbounded, NoTick, Cluster<'a, Replica>>,
) {
let (r_buffered_payloads_complete_cycle, r_buffered_payloads) = replicas.tick_cycle();
// p_to_replicas.inspect(q!(|payload: ReplicaPayload| println!("Replica received payload: {:?}", payload)));
Expand Down Expand Up @@ -139,10 +149,10 @@ pub fn replica<'a>(
let r_kv_store = r_processable_payloads
.clone()
.persist() // Optimization: all_ticks() + fold() = fold<static>, where the state of the previous fold is saved and persisted values are deleted.
.fold(q!(|| (HashMap::<u32, String>::new(), -1)), q!(|state, payload| {
let kv_store = &mut state.0;
let last_seq = &mut state.1;
kv_store.insert(payload.kv.key, payload.kv.value);
.fold(q!(|| (HashMap::new(), -1)), q!(|(kv_store, last_seq), payload| {
if let Some(kv) = payload.kv {
kv_store.insert(kv.key, kv.value);
}
debug_assert!(payload.seq == *last_seq + 1, "Hole in log between seq {} and {}", *last_seq, payload.seq);
*last_seq = payload.seq;
// println!("Replica kv store: {:?}", kv_store);
Expand Down Expand Up @@ -172,6 +182,8 @@ pub fn replica<'a>(
r_checkpointed_seqs_complete_cycle.complete_next_tick(r_checkpoint_seq_new.clone());

// Tell clients that the payload has been committed. All ReplicaPayloads contain the client's machine ID (to string) as value.
let r_to_clients = p_to_replicas;
(r_checkpoint_seq_new.all_ticks(), r_to_clients)
(
r_checkpoint_seq_new.all_ticks(),
p_to_replicas.filter_map(q!(|t| t.kv)),
)
}
Loading

0 comments on commit 0a5abab

Please sign in to comment.