Skip to content

Commit

Permalink
Feature: use blank log for heartbeat (#483)
Browse files Browse the repository at this point in the history
* Feature: use blank log for heartbeat

Heartbeat in standard raft is the way for a leader to assert it is still alive.
- A leader send heartbeat at a regular interval.
- A follower that receives a heartbeat believes there is an active leader thus it rejects election request(`send_vote`) from another node unreachable to the leader, for a short period.

Openraft heartbeat is a blank log

Such a heartbeat mechanism depends on clock time.
But raft as a distributed consensus already has its own **pseudo time** defined very well.
The **pseudo time** in openraft is a tuple `(vote, last_log_id)`, compared in dictionary order.

Why it works

To refuse the election by a node that does not receive recent messages from the current leader,
just let the active leader send a **blank log** to increase the **pseudo time** on a quorum.

Because the leader must have the greatest **pseudo time**,
thus by comparing the **pseudo time**, a follower automatically refuse election request from a node unreachable to the leader.

And comparing the **pseudo time** is already done by `handle_vote_request()`,
there is no need to add another timer for the active leader.

Other changes:

- Feature: add API to switch timeout based events:
  - `Raft::enable_tick()`: switch on/off election and heartbeat.
  - `Raft::enable_heartbeat()`: switch on/off heartbeat.
  - `Raft::enable_elect()`: switch on/off election.

  These methods make some testing codes easier to write.
  The corresponding `Config` entries are also added:
  `Config::enable_tick`
  `Config::enable_heartbeat`
  `Config::enable_elect`

- Refactor: remove Engine `Command::RejectElection`.
  Rejecting election now is part of `handle_vote_req()` as blank-log
  heartbeat is introduced.

- Refactor: heartbeat is removed from `ReplicationCore`.
  Instead, heartbeat is emitted by `RaftCore`.

- Fix: when failed to sending append-entries, do not clear
  `need_to_replicate` flag.

- CI: add test with higher network delay.

- Doc: explain why using blank log as heartbeat.

- Fix: #151
  • Loading branch information
drmingdrmer authored Aug 1, 2022
1 parent a37a450 commit 956177d
Show file tree
Hide file tree
Showing 63 changed files with 603 additions and 351 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ jobs:
store_defensive:
- "on"
- "off"
send_delay:
- "0"
- "30"

steps:
- name: Setup | Checkout
Expand All @@ -31,7 +34,8 @@ jobs:
RUST_TEST_THREADS: 2
RUST_LOG: debug
RUST_BACKTRACE: full
RAFT_STORE_DEFENSIVE: ${{ matrix.store_defensive }}
OPENRAFT_STORE_DEFENSIVE: ${{ matrix.store_defensive }}
OPENRAFT_NETWORK_SEND_DELAY: ${{ matrix.send_delay }}


- name: Build | Release Mode
Expand Down
4 changes: 2 additions & 2 deletions .mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ queue_rules:
# - '#check-pending=0'
- '#check-success>=2'
- check-success=check-subject
- check-success=unittest (on)
- check-success=unittest (on, 0)
- check-success~=unittest

pull_request_rules:
Expand All @@ -14,7 +14,7 @@ pull_request_rules:
- "#approved-reviews-by>=1"
- "#changes-requested-reviews-by=0"
- check-success=check-subject
- check-success=unittest (on)
- check-success=unittest (on, 0)
actions:
queue:
name: feature_queue
Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
all: test lint fmt defensive_test doc
all: test lint fmt defensive_test send_delay_test doc

defensive_test:
RAFT_STORE_DEFENSIVE=on cargo test
OPENRAFT_STORE_DEFENSIVE=on cargo test

send_delay_test:
OPENRAFT_NETWORK_SEND_DELAY=30 cargo test

test: lint fmt
cargo test
Expand Down
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [Internal](./internal.md)
- [Architecture](./architecture.md)
- [Threads](./threading.md)
- [Heartbeat](./heartbeat.md)
- [Vote](./vote.md)
- [Replication](./replication.md)
- [Delete-conflicting-logs](./delete_log.md)
Expand Down
40 changes: 40 additions & 0 deletions guide/src/heartbeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Heartbeat in openraft

## Heartbeat in standard raft

Heartbeat in standard raft is the way for a leader to assert it is still alive:
- A leader send heartbeat at a regular interval.
- A follower that receives a heartbeat believes there is an active leader thus it rejects election request(`send_vote`) from another node unreachable to the leader, for a short period.

## Openraft heartbeat is a blank log

Such a heartbeat mechanism depends on clock time.
But raft as a distributed consensus already has its own **pseudo time** defined very well.
The **pseudo time** in openraft is a tuple `(vote, last_log_id)`, compared in dictionary order.

### Why it works

To refuse the election by a node that does not receive recent messages from the current leader,
just let the active leader send a **blank log** to increase the **pseudo time** on a quorum.

Because the leader must have the greatest **pseudo time**,
thus by comparing the **pseudo time**, a follower automatically refuse election request from a node unreachable to the leader.

And comparing the **pseudo time** is already done by `handle_vote_request()`,
there is no need to add another timer for the active leader.

Thus making heartbeat request a blank log is the simplest way.

## Why blank log heartbeat?

- Simple, get rid of a timer.
- Easy to prove, and reduce code complexity.

## Other Concerns

- More raft logs are generated.





49 changes: 49 additions & 0 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Raft runtime configuration.

use std::sync::atomic::AtomicBool;

use clap::Parser;
use rand::thread_rng;
use rand::Rng;
Expand Down Expand Up @@ -154,6 +156,53 @@ pub struct Config {
/// The minimal number of applied logs to purge in a batch.
#[clap(long, default_value = "1")]
pub purge_batch_size: u64,

/// Enable or disable tick.
///
/// If ticking is disabled, timeout based events are all disabled:
/// a follower won't wake up to enter candidate state,
/// and a leader won't send heartbeat.
///
/// This flag is mainly used for test, or to build a consensus system that does not depend on wall clock.
/// The value of this config is evaluated as follow:
/// - being absent: true
/// - `--enable-tick`: true
/// - `--enable-tick=true`: true
/// - `--enable-tick=false`: false
#[clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
default_missing_value = "true")]
pub enable_tick: bool,

/// Whether a leader sends heartbeat log to following nodes, i.e., followers and learners.
#[clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
default_missing_value = "true")]
pub enable_heartbeat: bool,

/// Whether a follower will enter candidate state if it does not receive message from the leader for a while.
#[clap(long,
default_value_t = true,
action = clap::ArgAction::Set,
default_missing_value = "true")]
pub enable_elect: bool,
}

/// Updatable config for a raft runtime.
pub(crate) struct RuntimeConfig {
pub(crate) enable_heartbeat: AtomicBool,
pub(crate) enable_elect: AtomicBool,
}

impl RuntimeConfig {
pub(crate) fn new(config: &Config) -> Self {
Self {
enable_heartbeat: AtomicBool::from(config.enable_heartbeat),
enable_elect: AtomicBool::from(config.enable_elect),
}
}
}

impl Default for Config {
Expand Down
53 changes: 52 additions & 1 deletion openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn test_build() -> anyhow::Result<()> {
}

#[test]
fn test_config_build_() -> anyhow::Result<()> {
fn test_config_keep_unsnapshoted_log() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--keep-unsnapshoted-log"])?;
assert_eq!(true, config.keep_unsnapshoted_log);

Expand All @@ -88,3 +88,54 @@ fn test_config_build_() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn test_config_enable_tick() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--enable-tick=false"])?;
assert_eq!(false, config.enable_tick);

let config = Config::build(&["foo", "--enable-tick=true"])?;
assert_eq!(true, config.enable_tick);

let config = Config::build(&["foo", "--enable-tick"])?;
assert_eq!(true, config.enable_tick);

let config = Config::build(&["foo"])?;
assert_eq!(true, config.enable_tick);

Ok(())
}

#[test]
fn test_config_enable_heartbeat() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--enable-heartbeat=false"])?;
assert_eq!(false, config.enable_heartbeat);

let config = Config::build(&["foo", "--enable-heartbeat=true"])?;
assert_eq!(true, config.enable_heartbeat);

let config = Config::build(&["foo", "--enable-heartbeat"])?;
assert_eq!(true, config.enable_heartbeat);

let config = Config::build(&["foo"])?;
assert_eq!(true, config.enable_heartbeat);

Ok(())
}

#[test]
fn test_config_enable_elect() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--enable-elect=false"])?;
assert_eq!(false, config.enable_elect);

let config = Config::build(&["foo", "--enable-elect=true"])?;
assert_eq!(true, config.enable_elect);

let config = Config::build(&["foo", "--enable-elect"])?;
assert_eq!(true, config.enable_elect);

let config = Config::build(&["foo"])?;
assert_eq!(true, config.enable_elect);

Ok(())
}
1 change: 1 addition & 0 deletions openraft/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ mod error;
#[cfg(test)] mod config_test;

pub use config::Config;
pub(crate) use config::RuntimeConfig;
pub use config::SnapshotPolicy;
pub use error::ConfigError;
1 change: 0 additions & 1 deletion openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

self.set_next_election_time(false);
self.reject_election_for_a_while();

if req.vote > self.engine.state.vote {
self.engine.state.vote = req.vote;
Expand Down
Loading

0 comments on commit 956177d

Please sign in to comment.