Skip to content

Commit

Permalink
Change: only purge logs that are in snapshot
Browse files Browse the repository at this point in the history
Let `snapshot+logs` be a complete state of a raft node.

The Assumption before is `state_machine+logs` is a complete state of a
raft node. This requires state machine to persist the state every time
applying a log, which would be an innecessary overhead.

- Change: remove ENV config entries. Do not let a lib be affected by
  environment variables.

- Change: remove `Config.keep_unsnapshoted_log`: now by default, logs
  not included in snapshot won't be deleted.

  Rename `Config.max_applied_log_to_keep` to `max_in_snapshot_log_to_keep`.
  • Loading branch information
drmingdrmer committed Aug 28, 2022
1 parent 15efb3a commit d0d04b2
Show file tree
Hide file tree
Showing 21 changed files with 173 additions and 352 deletions.
45 changes: 13 additions & 32 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,75 +83,56 @@ fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct Config {
/// The application specific name of this Raft cluster
#[clap(long, env = "RAFT_CLUSTER_NAME", default_value = "foo")]
#[clap(long, default_value = "foo")]
pub cluster_name: String,

/// The minimum election timeout in milliseconds
#[clap(long, env = "RAFT_ELECTION_TIMEOUT_MIN", default_value = "150")]
#[clap(long, default_value = "150")]
pub election_timeout_min: u64,

/// The maximum election timeout in milliseconds
#[clap(long, env = "RAFT_ELECTION_TIMEOUT_MAX", default_value = "300")]
#[clap(long, default_value = "300")]
pub election_timeout_max: u64,

/// The heartbeat interval in milliseconds at which leaders will send heartbeats to followers
#[clap(long, env = "RAFT_HEARTBEAT_INTERVAL", default_value = "50")]
#[clap(long, default_value = "50")]
pub heartbeat_interval: u64,

/// The timeout for sending a snapshot segment, in millisecond
#[clap(long, env = "RAFT_INSTALL_SNAPSHOT_TIMEOUT", default_value = "200")]
#[clap(long, default_value = "200")]
pub install_snapshot_timeout: u64,

/// The maximum number of entries per payload allowed to be transmitted during replication
///
/// If this is too low, it will take longer for the nodes to be brought up to
/// consistency with the rest of the cluster.
#[clap(long, env = "RAFT_MAX_PAYLOAD_ENTRIES", default_value = "300")]
#[clap(long, default_value = "300")]
pub max_payload_entries: u64,

/// The distance behind in log replication a follower must fall before it is considered lagging
///
/// Once a replication stream transition into line-rate state, the target node will be considered safe to join a
/// cluster.
#[clap(long, env = "RAFT_REPLICATION_LAG_THRESHOLD", default_value = "1000")]
#[clap(long, default_value = "1000")]
pub replication_lag_threshold: u64,

/// The snapshot policy to use for a Raft node.
#[clap(
long,
env = "RAFT_SNAPSHOT_POLICY",
default_value = "since_last:5000",
parse(try_from_str=parse_snapshot_policy)
)]
pub snapshot_policy: SnapshotPolicy,

/// Whether to keep `applied_log`s that are not included by snapshots.
///
/// If your application may rebuild it's state machine from snapshots,
/// please set this to true.
///
/// By default, `OpenRaft` purges `applied_log`s from time to time regardless of snapshots, because it assumes once
/// logs are `applied` to the state machine, logs are persisted on disk.
///
/// If an implementation does not persist data when `RaftStorage::apply_to_state_machine()` returns, and just
/// relies on `snapshot` to rebuild the state machine when the next time it restarts, the application must always
/// set `keep_unsnapshoted_log` to `true`, so that only logs that are already included in a snapshot will be
/// purged.
//
// This is another way to implement:
// #[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG",
// default_value_t = false,
// value_parser=clap::value_parser!(bool))]
#[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG")]
pub keep_unsnapshoted_log: bool,

/// The maximum snapshot chunk size allowed when transmitting snapshots (in bytes)
#[clap(long, env = "RAFT_SNAPSHOT_MAX_CHUNK_SIZE", default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))]
#[clap(long, default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))]
pub snapshot_max_chunk_size: u64,

/// The maximum number of applied logs to keep before purging
#[clap(long, env = "RAFT_MAX_APPLIED_LOG_TO_KEEP", default_value = "1000")]
pub max_applied_log_to_keep: u64,
/// The maximum number of logs to keep that are already included in **snapshot**.
///
/// Logs that are not in snapshot will never be purged.
#[clap(long, default_value = "1000")]
pub max_in_snapshot_log_to_keep: u64,

/// The minimal number of applied logs to purge in a batch.
#[clap(long, default_value = "1")]
Expand Down
17 changes: 2 additions & 15 deletions openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ fn test_build() -> anyhow::Result<()> {
"--max-payload-entries=201",
"--replication-lag-threshold=202",
"--snapshot-policy=since_last:203",
"--keep-unsnapshoted-log",
"--snapshot-max-chunk-size=204",
"--max-applied-log-to-keep=205",
"--max-in-snapshot-log-to-keep=205",
"--purge-batch-size=207",
])?;

Expand All @@ -70,25 +69,13 @@ fn test_build() -> anyhow::Result<()> {
assert_eq!(201, config.max_payload_entries);
assert_eq!(202, config.replication_lag_threshold);
assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy);
assert_eq!(true, config.keep_unsnapshoted_log);
assert_eq!(204, config.snapshot_max_chunk_size);
assert_eq!(205, config.max_applied_log_to_keep);
assert_eq!(205, config.max_in_snapshot_log_to_keep);
assert_eq!(207, config.purge_batch_size);

Ok(())
}

#[test]
fn test_config_keep_unsnapshoted_log() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--keep-unsnapshoted-log"])?;
assert_eq!(true, config.keep_unsnapshoted_log);

let config = Config::build(&["foo"])?;
assert_eq!(false, config.keep_unsnapshoted_log);

Ok(())
}

#[test]
fn test_config_enable_tick() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--enable-tick=false"])?;
Expand Down
7 changes: 4 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.storage.save_vote(&state.vote).await?;

self.engine = Engine::new(self.id, &state, EngineConfig {
max_applied_log_to_keep: self.config.max_applied_log_to_keep,
max_in_snapshot_log_to_keep: self.config.max_in_snapshot_log_to_keep,
purge_batch_size: self.config.purge_batch_size,
keep_unsnapshoted_log: self.config.keep_unsnapshoted_log,
});

// Fetch the most recent snapshot in the system.
Expand Down Expand Up @@ -850,7 +849,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// TODO: add building-session id to identify different building
match result {
SnapshotResult::Ok(meta) => {
self.engine.update_snapshot(meta);
self.engine.finish_building_snapshot(meta);
self.run_engine_commands::<Entry<C>>(&[]).await?;
}
SnapshotResult::StorageError(sto_err) => {
Expand All @@ -868,6 +867,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
/// If force is True, it will skip the threshold check and start creating snapshot as demanded.
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn trigger_log_compaction_if_needed(&mut self, force: bool) {
tracing::debug!("trigger_log_compaction_if_needed: force: {}", force);

if let SnapshotState::None = self.snapshot_state {
// Continue.
} else {
Expand Down
6 changes: 6 additions & 0 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ pub(crate) fn snapshot_is_within_half_of_threshold(
last_log_index: &u64,
threshold: &u64,
) -> bool {
tracing::debug!(
"snapshot_last_indeix:{}, last_log_index:{}, threshold: {}",
snapshot_last_index,
last_log_index,
threshold
);
// Calculate distance from actor's last log index.
let distance_from_line = last_log_index.saturating_sub(*snapshot_last_index);

Expand Down
61 changes: 7 additions & 54 deletions openraft/src/engine/calc_purge_upto_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ fn eng() -> Engine<u64, ()> {

#[test]
fn test_calc_purge_upto() -> anyhow::Result<()> {
// last_purged_log_id, committed, max_keep, want
// last_purged_log_id, last_snapshot_log_id, max_keep, want
// last_applied should not affect the purge
let cases = vec![
//
(None, None, 0, None),
Expand Down Expand Up @@ -53,71 +54,23 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {
(Some(log_id(1, 2)), Some(log_id(3, 4)), 5, None),
];

for (last_purged, committed, max_keep, want) in cases {
for (last_purged, snapshot_last_log_id, max_keep, want) in cases {
let mut eng = eng();
eng.config.keep_unsnapshoted_log = false;
eng.config.max_applied_log_to_keep = max_keep;
eng.config.max_in_snapshot_log_to_keep = max_keep;
eng.config.purge_batch_size = 1;

if let Some(last_purged) = last_purged {
eng.state.log_ids.purge(&last_purged);
}
eng.state.committed = committed;
eng.snapshot_meta.last_log_id = snapshot_last_log_id;
let got = eng.calc_purge_upto();

assert_eq!(
want, got,
"case: last_purged: {:?}, last_applied: {:?}, max_keep: {}",
last_purged, committed, max_keep
"case: last_purged: {:?}, snapshot_last_log_id: {:?}, max_keep: {}",
last_purged, snapshot_last_log_id, max_keep
);
}

Ok(())
}

#[test]
// in this test, keep_unsnapshoted_log is set to true.
// logs being purged should at most the last that was in the snapshot.
fn test_keep_unsnapshoted() -> anyhow::Result<()> {
let cases = vec![
// last_deleted, last_applied, last_snapshoted, max_keep, want
// empty test
(None, None, None, 0, None),
(None, None, None, 1, None),
// nothing in snapshot
(Some(log_id(1, 1)), Some(log_id(2, 2)), None, 0, None),
(Some(log_id(1, 1)), Some(log_id(5, 5)), None, 0, None),
// snapshot kept up
(None, Some(log_id(5, 5)), Some(log_id(3, 4)), 0, Some(log_id(3, 4))),
(
Some(log_id(1, 1)),
Some(log_id(5, 5)),
Some(log_id(5, 5)),
0,
Some(log_id(5, 5)),
),
];

for (purged, committed, snapshot, keep, want) in cases {
let mut eng = eng();
eng.config.keep_unsnapshoted_log = true;
eng.config.max_applied_log_to_keep = keep;
eng.config.purge_batch_size = 1;

if let Some(last_purged) = purged {
eng.state.log_ids.purge(&last_purged);
}
eng.state.committed = committed;
eng.snapshot_meta.last_log_id = snapshot;

let got = eng.calc_purge_upto();

assert_eq!(
want, got,
"case: purged: {:?}, committed: {:?}, snapshot: {:?}, keep: {}",
purged, committed, snapshot, keep
)
}

Ok(())
}
47 changes: 23 additions & 24 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,17 @@ use crate::Vote;
#[derive(PartialEq, Eq)]
pub(crate) struct EngineConfig {
/// The maximum number of applied logs to keep before purging.
pub(crate) max_applied_log_to_keep: u64,
pub(crate) max_in_snapshot_log_to_keep: u64,

/// The minimal number of applied logs to purge in a batch.
pub(crate) purge_batch_size: u64,

/// whether to keep applied log that are not included in snapshots.
/// false by default
pub(crate) keep_unsnapshoted_log: bool,
}

impl Default for EngineConfig {
fn default() -> Self {
Self {
max_applied_log_to_keep: 1000,
max_in_snapshot_log_to_keep: 1000,
purge_batch_size: 256,
keep_unsnapshoted_log: false,
}
}
}
Expand Down Expand Up @@ -433,7 +428,6 @@ where
already_committed: prev_committed,
upto: committed.unwrap(),
});
self.purge_applied_log();
}
}

Expand Down Expand Up @@ -540,13 +534,13 @@ where
}
}

/// Purge applied log if needed.
/// Purge logs that are already in snapshot if needed.
///
/// `max_applied_log_to_keep` specifies the number of applied logs to keep.
/// `max_applied_log_to_keep==0` means every applied log can be purged.
/// `max_in_snapshot_log_to_keep` specifies the number of logs already included in snapshot to keep.
/// `max_in_snapshot_log_to_keep==0` means to purge every log stored in snapshot.
// NOTE: simple method, not tested.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn purge_applied_log(&mut self) {
pub(crate) fn purge_in_snapshot_log(&mut self) {
if let Some(purge_upto) = self.calc_purge_upto() {
self.purge_log(purge_upto);
}
Expand All @@ -562,28 +556,21 @@ where
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn calc_purge_upto(&mut self) -> Option<LogId<NID>> {
let st = &self.state;
let last_applied = &st.committed;
let max_keep = self.config.max_applied_log_to_keep;
let max_keep = self.config.max_in_snapshot_log_to_keep;
let batch_size = self.config.purge_batch_size;

let mut purge_end = last_applied.next_index().saturating_sub(max_keep);

if self.config.keep_unsnapshoted_log {
let idx = self.snapshot_meta.last_log_id.next_index();
tracing::debug!("the very last log included in snapshots: {}", idx);
purge_end = idx.min(purge_end);
}
let purge_end = self.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep);

tracing::debug!(
last_applied = debug(last_applied),
snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
max_keep,
"try purge: (-oo, {})",
purge_end
);

if st.last_purged_log_id().next_index() + batch_size > purge_end {
tracing::debug!(
last_applied = debug(last_applied),
snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
max_keep,
last_purged_log_id = display(st.last_purged_log_id().summary()),
batch_size,
Expand Down Expand Up @@ -640,6 +627,7 @@ where
// membership.log_id = (10, 5);
// local_effective.log_id = (2, 10);
if effective.log_id.index() <= m.log_id.index() {
// TODO: if effective membership changes, call `update_repliation()`
effective = m;
}

Expand Down Expand Up @@ -743,7 +731,6 @@ where
already_committed: prev_committed,
upto: self.state.committed.unwrap(),
});
self.purge_applied_log();
}
}

Expand Down Expand Up @@ -801,6 +788,18 @@ where
self.purge_log(snap_last_log_id)
}

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<NID, N>) {
tracing::info!("finish_building_snapshot: {:?}", meta);

let updated = self.update_snapshot(meta);
if !updated {
return;
}

self.purge_in_snapshot_log();
}

/// Update engine state when a new snapshot is built or installed.
///
/// Engine records only the metadata of a snapshot. Snapshot data is stored by RaftStorage implementation.
Expand Down
Loading

0 comments on commit d0d04b2

Please sign in to comment.