Skip to content

Commit

Permalink
Change: RaftStorage::get_log_state() returns last purge log id
Browse files Browse the repository at this point in the history
-   Change: `get_log_state()` returns the `last_purged_log_id` instead of the `first_log_id`.
    Because there are some cases in which log are empty:
    When a snapshot is install that covers all logs,
    or when `max_applied_log_to_keep` is 0.

    Returning `None` is not clear about if there are no logs at all or
    all logs are deleted.

    In such cases, raft still needs to maintain log continuity
    when repilcating. Thus the last log id that once existed is important.
    Previously this is done by checking the `last_applied_log_id`, which is
    dirty and buggy.

    Now an implementation of `RaftStorage` has to maintain the
    `last_purged_log_id` in its store.

-   Change: Remove `first_id_in_log()`, `last_log_id()`, `first_known_log_id()`,
    because concepts are changed.

-   Change: Split `delete_logs()` into two method for clarity:

    `delete_conflict_logs_since()` for deleting conflict logs when the
    replication receiving end find a conflict log.

    `purge_logs_upto()` for cleaning applied logs

-   Change: Rename `finalize_snapshot_installation()` to `install_snapshot()`.

-   Refactor: Remove `initial_replicate_to_state_machine()`, which does nothing
    more than a normal applying-logs.

-   Refactor: Remove `enum UpdateCurrentLeader`. It is just a wrapper of Option.
  • Loading branch information
drmingdrmer committed Jan 16, 2022
1 parent 5cfbd54 commit a52a930
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 513 deletions.
58 changes: 47 additions & 11 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::storage::HardState;
use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AppData;
use openraft::AppDataResponse;
Expand Down Expand Up @@ -83,14 +84,19 @@ pub struct MemStoreStateMachine {

/// An in-memory storage system implementing the `RaftStorage` trait.
pub struct MemStore {
last_purged_log_id: RwLock<Option<LogId>>,

/// The Raft log.
log: RwLock<BTreeMap<u64, Entry<ClientRequest>>>,

/// The Raft state machine.
sm: RwLock<MemStoreStateMachine>,

/// The current hard state.
hs: RwLock<Option<HardState>>,

snapshot_idx: Arc<Mutex<u64>>,

/// The current snapshot.
current_snapshot: RwLock<Option<MemStoreSnapshot>>,
}
Expand All @@ -105,6 +111,7 @@ impl MemStore {
let current_snapshot = RwLock::new(None);

Self {
last_purged_log_id: RwLock::new(None),
log,
sm,
hs,
Expand Down Expand Up @@ -151,29 +158,58 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(res)
}

async fn get_log_state(&self) -> Result<(Option<LogId>, Option<LogId>), StorageError> {
async fn get_log_state(&self) -> Result<LogState, StorageError> {
let log = self.log.read().await;
let first = log.iter().next().map(|(_, ent)| ent.log_id);
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
Ok((first, last))

let last_deleted = *self.last_purged_log_id.read().await;

let last = match last {
None => last_deleted,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_deleted,
last_log_id: last,
})
}

async fn last_applied_state(&self) -> Result<(Option<LogId>, Option<EffectiveMembership>), StorageError> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}

#[tracing::instrument(level = "debug", skip(self, range), fields(range=?range))]
async fn delete_log<R: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
range: R,
) -> Result<(), StorageError> {
#[tracing::instrument(level = "debug", skip(self))]
async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

{
tracing::debug!("delete_logs_from: {:?}", range);
let mut log = self.log.write().await;

let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
}

Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> {
tracing::debug!("delete_log: [{:?}, +oo)", log_id);

{
let mut ld = self.last_purged_log_id.write().await;
assert!(*ld <= Some(log_id));
*ld = Some(log_id);
}

{
let mut log = self.log.write().await;

let keys = log.range(range).map(|(k, _v)| *k).collect::<Vec<_>>();
let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
Expand Down Expand Up @@ -289,7 +325,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn finalize_snapshot_installation(
async fn install_snapshot(
&self,
meta: &SnapshotMeta,
snapshot: Box<Self::SnapshotData>,
Expand Down
6 changes: 2 additions & 4 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::core::client::ClientRequestEntry;
use crate::core::LeaderState;
use crate::core::LearnerState;
use crate::core::State;
use crate::core::UpdateCurrentLeader;
use crate::error::AddLearnerError;
use crate::error::ChangeMembershipError;
use crate::error::ClientWriteError;
Expand Down Expand Up @@ -220,15 +219,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// TODO(xp): transfer leadership
self.core.set_target_state(State::Learner);
self.core.update_current_leader(UpdateCurrentLeader::Unknown);
self.core.current_leader = None;
return;
}

let membership = &self.core.effective_membership.membership;

let all = membership.all_nodes();
for (id, state) in self.nodes.iter_mut() {
if all.contains(id) {
if membership.contains(id) {
continue;
}

Expand Down
95 changes: 8 additions & 87 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::core::apply_to_state_machine;
use crate::core::RaftCore;
use crate::core::State;
use crate::core::UpdateCurrentLeader;
use crate::error::AppendEntriesError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
Expand All @@ -11,14 +10,11 @@ use crate::raft_types::LogIdOptionExt;
use crate::AppData;
use crate::AppDataResponse;
use crate::EffectiveMembership;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;
use crate::StorageIOError;
use crate::Update;

impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> RaftCore<D, R, N, S> {
Expand Down Expand Up @@ -77,7 +73,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Update current leader if needed.
if self.current_leader.as_ref() != Some(&msg.leader_id) {
self.update_current_leader(UpdateCurrentLeader::OtherNode(msg.leader_id));
self.current_leader = Some(msg.leader_id);
report_metrics = true;
}

Expand All @@ -104,20 +100,16 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

#[tracing::instrument(level = "debug", skip(self))]
async fn delete_logs(&mut self, start: u64) -> Result<(), StorageError> {
async fn delete_conflict_logs_since(&mut self, start: LogId) -> Result<(), StorageError> {
// TODO(xp): add a StorageAdapter to provide auxiliary APIs.
// e.g.:
// - extract and manage membership config.
// - keep track of last_log_id, first_log_id,
// RaftStorage should only provides the least basic APIs.

self.storage.delete_log(start..).await?;
self.storage.delete_conflict_logs_since(start).await?;

self.last_log_id = if start == 0 {
None
} else {
self.get_log_id(start - 1).await?
};
self.last_log_id = self.storage.get_log_state().await?.last_log_id;

// TODO(xp): get_membership() should have a defensive check to ensure it always returns Some() if node is
// initialized. Because a node always commit a membership log as the first log entry.
Expand All @@ -136,31 +128,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Ok(())
}

// TODO(xp): rename it to `find_last_log_id`.
// If there is no log at all, it should return None.
// TODO(xp): self.last_log_id is not clear about what it is: if there is not log, it is set to `last_applied`.
// Consider rename it.
#[tracing::instrument(level = "debug", skip(self))]
async fn get_log_id(&mut self, index: u64) -> Result<Option<LogId>, StorageError> {
assert!(Some(index) >= self.last_applied.index());

if Some(index) == self.last_applied.index() {
return Ok(self.last_applied);
}

let entries = self.storage.get_log_entries(index..=index).await?;

let entry = entries.first().ok_or_else(|| StorageError::IO {
source: StorageIOError::new(
ErrorSubject::LogIndex(index),
ErrorVerb::Read,
anyhow::anyhow!("not found").into(),
),
})?;

Ok(Some(entry.log_id))
}

/// Skip log entries that have the same term as the entries the leader sent.
/// Delete entries since the first mismatching entry from local storage.
/// Returns a slice of entries that are not in local storage.
Expand Down Expand Up @@ -201,7 +168,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// If log 5 is committed by R1, and log 3 is not removed, R5 in future could become a new leader and overrides log
/// 5 on R3.
#[tracing::instrument(level="trace", skip(self, msg_entries), fields(msg_entries=%msg_entries.summary()))]
async fn delete_inconsistent_log<'s, 'e>(&'s mut self, msg_entries: &'e [Entry<D>]) -> Result<(), StorageError> {
async fn find_and_delete_conflict_logs(&mut self, msg_entries: &[Entry<D>]) -> Result<(), StorageError> {
// all msg_entries are inconsistent logs

tracing::debug!(msg_entries=%msg_entries.summary(), "try to delete_inconsistent_log");
Expand All @@ -225,7 +192,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
msg_entries.summary()
);

self.delete_logs(msg_entries[0].log_id.index).await?;
self.delete_conflict_logs_since(msg_entries[0].log_id).await?;

Ok(())
}
Expand Down Expand Up @@ -253,7 +220,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if let Some(last_log_id) = self.last_log_id {
if mismatched_log_id.index <= last_log_id.index {
tracing::debug!(%mismatched_log_id, "delete inconsistent log since prev_log_id");
self.delete_logs(mismatched_log_id.index).await?;
self.delete_conflict_logs_since(mismatched_log_id).await?;
}
}

Expand All @@ -277,7 +244,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// Before appending, if an entry overrides an inconsistent one, the entries after it must be deleted first.
// Raft requires log ids are in total order by (term,index).
// Otherwise the log id with max index makes committed entry invisible in election.
self.delete_inconsistent_log(entries).await?;
self.find_and_delete_conflict_logs(entries).await?;

self.append_log_entries(entries).await?;

Expand Down Expand Up @@ -414,14 +381,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn replicate_to_state_machine_if_needed(&mut self) -> Result<(), StorageError> {
tracing::debug!(?self.last_applied, "replicate_to_sm_if_needed");

// Perform initial replication to state machine if needed.
if !self.has_completed_initial_replication_to_sm {
// Optimistic update, as failures will cause shutdown.
self.has_completed_initial_replication_to_sm = true;
self.initial_replicate_to_state_machine().await?;
return Ok(());
}

// If we don't have any new entries to replicate, then do nothing.
if self.committed <= self.last_applied {
tracing::debug!(
Expand Down Expand Up @@ -452,42 +411,4 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

Ok(())
}

/// Perform an initial replication of outstanding entries to the state machine.
///
/// This will only be executed once, and only in response to its first payload of entries
/// from the AppendEntries RPC handler.
#[tracing::instrument(level = "debug", skip(self))]
async fn initial_replicate_to_state_machine(&mut self) -> Result<(), StorageError> {
assert!(self.committed <= self.last_log_id);

let stop = self.committed.next_index();

let start = self.last_applied.next_index();
let storage = self.storage.clone();

tracing::debug!(start, stop, ?self.committed, last_log_id=?self.last_log_id, "start stop");

// when self.commit_index is not initialized, e.g. the first heartbeat from leader always has a commit_index to
// be 0, because the leader needs one round of heartbeat to find out the commit index.
if start >= stop {
return Ok(());
}

// Fetch the series of entries which must be applied to the state machine, then apply them.

let entries = storage.get_log_entries(start..stop).await?;

let new_last_applied = entries.last().unwrap();

let data_entries: Vec<_> = entries.iter().collect();

apply_to_state_machine(storage, &data_entries, self.config.max_applied_log_to_keep).await?;

self.last_applied = Some(new_last_applied.log_id);
self.report_metrics(Update::AsIs);
self.trigger_log_compaction_if_needed(false);

Ok(())
}
}
9 changes: 4 additions & 5 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use anyerror::AnyError;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;

use crate::core::delete_applied_logs;
use crate::core::purge_applied_logs;
use crate::core::RaftCore;
use crate::core::SnapshotState;
use crate::core::State;
use crate::core::UpdateCurrentLeader;
use crate::error::InstallSnapshotError;
use crate::error::SnapshotMismatch;
use crate::raft::InstallSnapshotRequest;
Expand Down Expand Up @@ -56,7 +55,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Update current leader if needed.
if self.current_leader.as_ref() != Some(&req.leader_id) {
self.update_current_leader(UpdateCurrentLeader::OtherNode(req.leader_id));
self.current_leader = Some(req.leader_id);
report_metrics = true;
}

Expand Down Expand Up @@ -226,7 +225,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied

let changes = self.storage.finalize_snapshot_installation(&req.meta, snapshot).await?;
let changes = self.storage.install_snapshot(&req.meta, snapshot).await?;

tracing::debug!("update after apply or install-snapshot: {:?}", changes);

Expand All @@ -236,7 +235,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

if let Some(last_applied) = changes.last_applied {
// Applied logs are not needed.
delete_applied_logs(self.storage.clone(), &last_applied, self.config.max_applied_log_to_keep).await?;
purge_applied_logs(self.storage.clone(), &last_applied, self.config.max_applied_log_to_keep).await?;

// snapshot is installed
self.last_applied = Some(last_applied);
Expand Down
Loading

0 comments on commit a52a930

Please sign in to comment.