Skip to content

Commit

Permalink
quorum: port single group joint commit (tikv#394)
Browse files Browse the repository at this point in the history
Signed-off-by: accelsao <jayzhan211@gmail.com>
  • Loading branch information
310552025atNYCU committed Sep 17, 2020
1 parent d71daff commit 0c53e7b
Show file tree
Hide file tree
Showing 6 changed files with 735 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ slog-async = "2.3.0"
slog-envlogger = "2.1.0"
slog-stdlog = "4"
slog-term = "2.4.0"
anyhow = "1.0.32"
datadriven = { path = "datadriven", version = "0.1.0" }

[[bench]]
name = "benches"
Expand Down
16 changes: 11 additions & 5 deletions src/quorum.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

#[cfg(test)]
pub mod datadriven_test;
pub mod joint;
pub mod majority;

Expand Down Expand Up @@ -28,10 +29,15 @@ pub struct Index {
impl Display for Index {
#[inline]
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if self.index != u64::MAX {
write!(f, "[{}]{}", self.group_id, self.index)
} else {
write!(f, "[{}]∞", self.group_id)
match self.group_id {
0 => match self.index {
u64::MAX => write!(f, "∞"),
index => write!(f, "{}", index),
},
group_id => match self.index {
u64::MAX => write!(f, "[{}]∞", group_id),
index => write!(f, "[{}]{}", group_id, index),
},
}
}
}
Expand Down
141 changes: 141 additions & 0 deletions src/quorum/datadriven_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use crate::quorum::{AckIndexer, Index};
use crate::{default_logger, HashSet, JointConfig, MajorityConfig};
use datadriven::{run_test, TestData};

fn test_quorum(data: &TestData) -> String {
// Two majority configs. The first one is always used (though it may
// be empty) and the second one is used iff joint is true.
let mut joint = false;
let mut ids: Vec<u64> = Vec::new();
let mut idsj: Vec<u64> = Vec::new();

// The committed indexes for the nodes in the config in the order in
// which they appear in (ids,idsj), without repetition. An underscore
// denotes an omission (i.e. no information for this voter); this is
// different from 0.
//
// For example,
// cfg=(1,2) cfgj=(2,3,4) idxs=(_,5,_,7) initializes the idx for voter 2
// to 5 and that for voter 4 to 7 (and no others).
//
// cfgj=zero is specified to instruct the test harness to treat cfgj
// as zero instead of not specified (i.e. it will trigger a joint
// quorum test instead of a majority quorum test for cfg only).
let mut idxs: Vec<Index> = Vec::new();

for arg in &data.cmd_args {
for val in &arg.vals {
match arg.key.as_str() {
"cfg" => {
let n: u64 = val.parse().expect("type of n should be u64");
ids.push(n);
}
"cfgj" => {
joint = true;

if val == "zero" {
assert_eq!(arg.vals.len(), 1, "cannot mix 'zero' into configuration")
} else {
let n: u64 = val.parse().expect("type of n should be u64");
idsj.push(n);
}
}
"idx" => {
let mut n: u64 = 0;
if val != "_" {
n = val.parse().expect("type of n should be u64");
if n == 0 {
panic!("use '_' as 0, check {}", data.pos)
}
}
idxs.push(Index {
index: n,
group_id: 0,
});
}
_ => {
panic!("unknown arg: {}", arg.key);
}
}
}
}

let ids_set: HashSet<u64> = ids.iter().cloned().collect();
let idsj_set: HashSet<u64> = idsj.iter().cloned().collect();

// Build the two majority configs.
let c = MajorityConfig::new(ids_set);
let cj = MajorityConfig::new(idsj_set);

let make_lookuper = |idxs: &[Index], ids: &[u64], idsj: &[u64]| -> AckIndexer {
let mut l = AckIndexer::default();
// next to consume from idxs
let mut p: usize = 0;
for id in ids.iter().chain(idsj) {
if !l.contains_key(id) && p < idxs.len() {
l.insert(*id, idxs[p]);
p += 1;
}
}

// Zero entries are created by _ placeholders; we don't want
// them in the lookuper because "no entry" is different from
// "zero entry". Note that we prevent tests from specifying
// zero commit indexes, so that there's no confusion between
// the two concepts.
l.retain(|_, index| index.index > 0);
l
};

// verify length of voters
let input = idxs.len();
let voters = JointConfig::new_joint_from_majorities(c.clone(), cj.clone())
.ids()
.len();

if voters != input {
return format!(
"error: mismatched input (explicit or _) for voters {:?}: {:?}",
voters, input
);
}

// buffer for expected value
let mut buf = String::new();

match data.cmd.as_str() {
"committed" => {
let use_group_commit = false;

let l = make_lookuper(&idxs, &ids, &idsj);

// Branch based on whether this is a majority or joint quorum
// test case.
if joint {
let cc = JointConfig::new_joint_from_majorities(c.clone(), cj.clone());
buf.push_str(&cc.describe(&l));
let idx = cc.committed_index(use_group_commit, &l);
// Interchanging the majorities shouldn't make a difference. If it does, print.
let a_idx = JointConfig::new_joint_from_majorities(cj, c)
.committed_index(use_group_commit, &l);
if a_idx != idx {
buf.push_str(&format!("{} <-- via symmetry\n", a_idx.0));
}
buf.push_str(&format!("{}\n", idx.0));
} else {
// TODO(accelsao): port majority commit
}
}
_ => {
panic!("unknown command: {}", data.cmd);
}
}
buf
}

#[test]
fn test_data_driven_quorum() -> anyhow::Result<()> {
let logger = default_logger();
run_test("src/quorum/testdata", test_quorum, false, &logger)?;
Ok(())
}
15 changes: 15 additions & 0 deletions src/quorum/joint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ impl Configuration {
}
}

#[cfg(test)]
pub(crate) fn new_joint_from_majorities(
incoming: MajorityConfig,
outgoing: MajorityConfig,
) -> Self {
Self { incoming, outgoing }
}

/// Creates an empty configuration with given capacity.
pub fn with_capacity(cap: usize) -> Configuration {
Configuration {
Expand Down Expand Up @@ -80,4 +88,11 @@ impl Configuration {
pub fn contains(&self, id: u64) -> bool {
self.incoming.contains(&id) || self.outgoing.contains(&id)
}

/// Describe returns a (multi-line) representation of the commit indexes for the
/// given lookuper.
#[cfg(test)]
pub(crate) fn describe(&self, l: &impl AckedIndexer) -> String {
MajorityConfig::new(self.ids().iter().collect()).describe(l)
}
}
85 changes: 85 additions & 0 deletions src/quorum/majority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::{AckedIndexer, Index, VoteResult};
use crate::{DefaultHashBuilder, HashSet};

use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::{cmp, slice, u64};
Expand Down Expand Up @@ -130,6 +131,90 @@ impl Configuration {
VoteResult::Lost
}
}

/// Describe returns a (multi-line) representation of the commit indexes for the
/// given lookuper.
/// Including `Index`,`Id` and the number of smaller index (represented as the bar)
///
/// Print `?` if `Index` is not exist.
///
/// e.g.
/// ```txt
/// idx
/// x> 100 (id=1)
/// xx> 101 (id=2)
/// > 99 (id=3)
/// 100
/// ```
#[cfg(test)]
pub(crate) fn describe(&self, l: &impl AckedIndexer) -> String {
use std::fmt::Write;

let n = self.voters.len();
if n == 0 {
return "<empty majority quorum>".to_string();
}

struct Tup {
id: u64,
idx: Option<Index>,
// length of bar displayed for this Tup
bar: usize,
}

// Below, populate .bar so that the i-th largest commit index has bar i (we
// plot this as sort of a progress bar). The actual code is a bit more
// complicated and also makes sure that equal index => equal bar.

let mut info = Vec::with_capacity(n);

for &id in &self.voters {
let idx = l.acked_index(id);
info.push(Tup { id, idx, bar: 0 })
}

info.sort_by(|a, b| {
(a.idx.unwrap_or_default().index, a.id).cmp(&(b.idx.unwrap_or_default().index, b.id))
});

for i in 0..n {
if i > 0
&& info[i - 1].idx.unwrap_or_default().index < info[i].idx.unwrap_or_default().index
{
info[i].bar = i;
}
}

info.sort_by(|a, b| a.id.cmp(&b.id));

let mut buf = String::new();
buf.push_str(" ".repeat(n).as_str());
buf.push_str(" idx\n");

for tup in info {
match tup.idx {
Some(idx) => {
buf.push_str("x".repeat(tup.bar).as_str());
buf.push('>');
buf.push_str(" ".repeat(n - tup.bar).as_str());
writeln!(buf, " {:>5} (id={})", format!("{}", idx), tup.id)
.expect("Error occurred while trying to write in String");
}
None => {
buf.push('?');
buf.push_str(" ".repeat(n).as_str());
writeln!(
buf,
" {:>5} (id={})",
format!("{}", Index::default()),
tup.id
)
.expect("Error occurred while trying to write in String");
}
}
}
buf
}
}

impl Deref for Configuration {
Expand Down
Loading

0 comments on commit 0c53e7b

Please sign in to comment.