Skip to content

Commit

Permalink
Feature: feature flag "single-term-leader": standard raft mode
Browse files Browse the repository at this point in the history
With this feature on: only one leader can be elected in each term, but
reduce LogId size from `LogId:{term, node_id, index}` to `LogId{term, index}`.

Add `CommittedLeaderId` as the leader-id type used in `LogId`:
The leader-id used in `LogId` can be different(smaller) from leader-id used
in `Vote`, depending on `LeaderId` definition.
`CommittedLeaderId` is the smallest data that can identify a leader
after the leadership is granted by a quorum(committed).

Change: Vote stores a LeaderId in it.

```rust
// Before
pub struct Vote<NID> {
    pub term: u64,
    pub node_id: NID,
    pub committed: bool,
}

// After
pub struct Vote<NID> {
    #[cfg_attr(feature = "serde", serde(flatten))]
    pub leader_id: LeaderId<NID>,

    pub committed: bool,
}
```

Upgrade tip:

If you manually serialize `Vote`, i.e. without using
`serde`, the serialization part should be rewritten.

Otherwise, nothing needs to be done.

- Fix: #660
  • Loading branch information
drmingdrmer committed Feb 14, 2023
1 parent 24c212a commit 4a85ee9
Show file tree
Hide file tree
Showing 75 changed files with 1,195 additions and 685 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,27 @@ jobs:
- toolchain: "stable"
store_defensive: "off"
send_delay: "0"
features: ""

- toolchain: "nightly"
store_defensive: "on"
send_delay: "30"
features: ""

- toolchain: "nightly"
store_defensive: "on"
send_delay: "0"
features: ""

- toolchain: "nightly"
store_defensive: "off"
send_delay: "0"
features: ""

- toolchain: "nightly"
store_defensive: "on"
send_delay: "0"
features: "single-term-leader"

steps:
- name: Setup | Checkout
Expand All @@ -107,6 +119,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --features "${{ matrix.features }}"
env:
# Parallel tests block each other and result in timeout.
RUST_TEST_THREADS: 2
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ test:
cargo test
cargo test --features bt
cargo test --features serde
cargo test --features single-term-leader
cargo test --manifest-path examples/raft-kv-memstore/Cargo.toml
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml

Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ Currently, openraft is the consensus engine of meta-service cluster in [databend

# Roadmap

- [x] [Extended joint membership](https://datafuselabs.github.io/openraft/extended-membership)
- [x] **2022-10-31** [Extended joint membership](https://datafuselabs.github.io/openraft/extended-membership)

- [ ] Reduce the complexity of vote and pre-vote: [get rid of pre-vote RPC](https://github.com/datafuselabs/openraft/discussions/15);
- [x] **2023-02-14** Reduce confliction rate when electing;
See: [Openraft Vote design](https://datafuselabs.github.io/openraft/vote);
Or use standard raft mode with [feature flag `single-term-leader`](https://datafuselabs.github.io/openraft/feature-flags).

- [ ] Reduce confliction rate when electing;
Allow leadership to be taken in one term by a node with greater node-id.
- [ ] Reduce the complexity of vote and pre-vote: [get rid of pre-vote RPC](https://github.com/datafuselabs/openraft/discussions/15);

- [ ] Support flexible quorum, e.g.:[Hierarchical Quorums](https://zookeeper.apache.org/doc/r3.5.9/zookeeperHierarchicalQuorums.html)

Expand Down
2 changes: 2 additions & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

- [Metrics](./metrics.md)

- [Feature flags](./feature-flags.md)

- [Internal](./internal.md)
- [Architecture](./architecture.md)
- [Threads](./threading.md)
Expand Down
13 changes: 13 additions & 0 deletions guide/src/feature-flags.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Feature flags

By default openraft enables no features.

- `bt`: attaches backtrace to generated errors.

- `serde`: derives `serde::Serialize, serde::Deserialize` for type that are used
in storage and network, such as `Vote` or `AppendEntriesRequest`.

- `single-term-leader`: allows only one leader to be elected in each `term`.
This is the standard raft policy, which increases election confliction rate
but reduce `LogId`(`(term, node_id, index)` to `(term, index)`) size.
Read more about how it is implemented in [`vote`](./vote.md)
97 changes: 95 additions & 2 deletions guide/src/vote.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,25 @@

```rust
struct Vote<NID: NodeId> {
term: u64,
node_id: NID,
leader_id: LeaderId<NID>
committed: bool
}

// Advanced mode(default):
#[cfg(not(feature = "single-term-leader"))]
pub struct LeaderId<NID: NodeId>
{
pub term: u64,
pub node_id: NID,
}

// Standard raft mode:
#[cfg(feature = "single-term-leader")]
pub struct LeaderId<NID: NodeId>
{
pub term: u64,
pub voted_for: Option<NID>,
}
```

`vote` in openraft defines the pseudo **time**(in other word, defines every `leader`) in a distributed consensus.
Expand All @@ -15,6 +30,19 @@ In a standard raft, the corresponding concept is `term`.
Although in standard raft a single `term` is not enough to define a **time
point**.

In openraft, RPC validity checking(such as when handling vote request, or
append-entries request) is very simple: **A node grants a `Vote` which is greater than its last seen `Vote`**:

```rust
fn handle_vote(vote: Vote) {
if !(vote >= self.vote) {
return Err(())
}
save_vote(vote);
Ok(())
}
```

Every server state(leader, candidate, follower or learner) has a unique
corresponding `vote`, thus `vote` can be used to identify different server
states, i.e, if the `vote` changes, the server state must have changed.
Expand All @@ -38,3 +66,68 @@ E.g.:

- A vote `(term=1, node_id=1, committed=false|true)` is in another different
follower/learner state for node-3.


## Partial order

`Vote` in openraft is partially ordered value,
i.e., it is legal that `!(vote_a => vote_b) && !(vote_a <= vote_b)`.
Because `Vote.leader_id` may be a partial order value:


## LeaderId: advanced mode and standard mode

Openraft provides two `LeaderId` type, which can be switched with feature
`single-term-leader`:

- `cargo build` without `single-term-leader`, is the advanced mode, the default mode:
It builds openraft with `LeaderId:(term, node_id)`, which is totally ordered.
Which means, in a single `term`, there could be more than one leaders
elected(although only the last is valid and can commit logs).

- Pros: election conflict is minimized,

- Cons: `LogId` becomes larger: every log has to store an additional `NodeId` in `LogId`:
`LogId: {{term, NodeId}, index}`.
If an application uses a big `NodeId` type, e.g., UUID, the penalty may not
be negligible.

- `cargo build --features "single-term-leader"` builds openraft in standard raft mode with:
`LeaderId:(term, voted_for:Option<NodeId>)`, which makes `LeaderId` and `Vote`
**partially-ordered** values. In this mode, only one leader can be elected in
each `term`.

The partial order relation of `LeaderId`:

```
LeaderId(3, None) > LeaderId(2, None): true
LeaderId(3, None) > LeaderId(2, Some(y)): true
LeaderId(3, None) == LeaderId(3, None): true
LeaderId(3, Some(x)) > LeaderId(2, Some(y)): true
LeaderId(3, Some(x)) > LeaderId(3, None): true
LeaderId(3, Some(x)) == LeaderId(3, Some(x)): true
LeaderId(3, Some(x)) > LeaderId(3, Some(y)): false
```

The partial order between `Vote` is defined as:
Given two `Vote` `a` and `b`:
`a > b` iff:

```rust
a.leader_id > b.leader_id || (
!(a.leader_id < b.leader_id) && a.committed > b.committed
)
```

In other words, if `a.leader_id` and `b.leader_id` is not
comparable(`!(a.leader_id>=b.leader_id) && !(a.leader_id<=b.leader_id)`), use
field `committed` to determine the order between `a` and `b`.

Because a leader must be granted by a quorum before committing any log, two
incomparable `leader_id` can not both be granted.
So let a committed `Vote` override a incomparable non-committed is safe.

- Pros: `LogId` just store a `term`.

- Cons: election conflicting rate may increase.

16 changes: 16 additions & 0 deletions openraft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ anyhow = { workspace = true }
async-entry = { workspace = true }
lazy_static = { workspace = true }
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true }

Expand All @@ -58,5 +59,20 @@ bt = ["anyerror/backtrace", "anyhow/backtrace"]
# If you'd like to use `serde` to serialize messages.
serde = ["dep:serde"]

# Turn on this feature it allows at most ONE quorum-granted leader for each term.
# This is the way standard raft does, by making the LeaderId a partial order value.
#
# - With this feature on:
# It is more likely to conflict during election. But every log only needs to store one `term` in it.
#
# - With this feature off:
# Election conflict rate will be reduced, but every log has to store a `LeaderId{ term, node_id}`,
# which may be costly if an application uses a big NodeId type.
#
# This feature is disabled by default.
single-term-leader = []

# default = ["single-term-leader"]

[package.metadata.docs.rs]
features = ["docinclude"] # Activate `docinclude` during docs.rs build.
16 changes: 13 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// If we receive a response with a greater term, then revert to follower and abort this
// request.
if let AppendEntriesResponse::HigherVote(vote) = data {
debug_assert!(
&vote > self.engine.state.get_vote(),
"committed vote({}) has total order relation with other votes({})",
self.engine.state.get_vote(),
vote
);

let res = self.engine.vote_handler().handle_message_vote(&vote);
self.run_engine_commands::<Entry<C>>(&[]).await?;

Expand Down Expand Up @@ -617,7 +624,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
id: self.id,

// --- data ---
current_term: self.engine.state.get_vote().term,
current_term: self.engine.state.get_vote().leader_id().get_term(),
last_log_index: self.engine.state.last_log_id().index(),
last_applied: self.engine.state.committed().copied(),
snapshot: self.engine.state.snapshot_meta.last_log_id,
Expand Down Expand Up @@ -810,11 +817,14 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
"get current_leader"
);

if !self.engine.state.get_vote().committed {
let vote = self.engine.state.get_vote();

if !vote.is_committed() {
return None;
}

let id = self.engine.state.get_vote().node_id;
// Safe unwrap(): vote that is committed has to already have voted for some node.
let id = vote.leader_id().voted_for().unwrap();

// TODO: `is_voter()` is slow, maybe cache `current_leader`,
// e.g., only update it when membership or vote changes
Expand Down
16 changes: 6 additions & 10 deletions openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,15 @@ where

let curr = h.unwrap_or_default();

if vote.term < curr.term {
return Err(DefensiveError::new(ErrorSubject::Vote, Violation::TermNotAscending {
curr: curr.term,
to: vote.term,
})
.into());
}

if vote >= &curr {
Ok(())
// OK
} else {
Err(DefensiveError::new(ErrorSubject::Vote, Violation::NonIncrementalVote { curr, to: *vote }).into())
return Err(
DefensiveError::new(ErrorSubject::Vote, Violation::VoteNotAscending { curr, to: *vote }).into(),
);
}

Ok(())
}

/// The log entries fed into a store must be consecutive otherwise it is a bug.
Expand Down
18 changes: 9 additions & 9 deletions openraft/src/engine/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::raft::VoteRequest;
use crate::raft_state::VoteStateReader;
use crate::CommittedLeaderId;
use crate::EffectiveMembership;
use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::MetricsChangeFlags;
use crate::Vote;

fn log_id(term: u64, index: u64) -> LogId<u64> {
LogId::<u64> {
leader_id: LeaderId { term, node_id: 1 },
leader_id: CommittedLeaderId::new(term, 1),
index,
}
}
Expand All @@ -33,7 +33,7 @@ fn m12() -> Membership<u64, ()> {

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::default();
eng.state.log_ids = LogIdList::new([LogId::new(LeaderId::new(0, 0), 0)]);
eng.state.log_ids = LogIdList::new([LogId::new(CommittedLeaderId::new(0, 0), 0)]);
eng.state.enable_validate = false; // Disable validation for incomplete state
eng
}
Expand Down Expand Up @@ -76,20 +76,20 @@ fn test_elect() -> anyhow::Result<()> {
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendBlankLog {
log_id: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},),
},
Command::LeaderCommit {
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 1, node_id: 1 },
leader_id: CommittedLeaderId::new(1, 1),
index: 1,
},
},
Expand Down Expand Up @@ -139,20 +139,20 @@ fn test_elect() -> anyhow::Result<()> {
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendBlankLog {
log_id: LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
leader_id: CommittedLeaderId::new(2, 1),
index: 1,
},
},
Command::ReplicateCommitted {
committed: Some(LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
leader_id: CommittedLeaderId::new(2, 1),
index: 1,
},),
},
Command::LeaderCommit {
already_committed: None,
upto: LogId {
leader_id: LeaderId { term: 2, node_id: 1 },
leader_id: CommittedLeaderId::new(2, 1),
index: 1,
},
},
Expand Down
7 changes: 5 additions & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ where
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let v = Vote::new(self.state.get_vote().term + 1, self.config.id);
let v = Vote::new(self.state.get_vote().leader_id().term + 1, self.config.id);
// Safe unwrap(): it won't reject itself ˙–˙
self.vote_handler().handle_message_vote(&v).unwrap();

Expand Down Expand Up @@ -572,7 +572,10 @@ where
}

fn assign_log_ids<'a, Ent: RaftEntry<NID, N> + 'a>(&mut self, entries: impl Iterator<Item = &'a mut Ent>) {
let mut log_id = LogId::new(self.state.get_vote().leader_id(), self.state.last_log_id().next_index());
let mut log_id = LogId::new(
self.state.get_vote().committed_leader_id().unwrap(),
self.state.last_log_id().next_index(),
);
for entry in entries {
entry.set_log_id(&log_id);
tracing::debug!("assign log id: {}", log_id);
Expand Down
Loading

0 comments on commit 4a85ee9

Please sign in to comment.