diff --git a/async-raft/Cargo.toml b/async-raft/Cargo.toml index 96e78f3c7..cb1009967 100644 --- a/async-raft/Cargo.toml +++ b/async-raft/Cargo.toml @@ -21,7 +21,7 @@ futures = "0.3" log = "0.4" rand = "0.8" serde = { version="1", features=["derive"] } -thiserror = "1.0.20" +thiserror = "1.0.29" tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] } tracing = "0.1.26" tracing-futures = "0.2.4" diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index c8f3d834d..02cd7f5b7 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -212,11 +212,11 @@ impl, S: RaftStorage> Ra #[tracing::instrument(level = "debug", skip(self))] async fn delete_logs(&mut self, start: u64) -> RaftResult<()> { - self.storage.delete_logs_from(start..).await.map_err(|err| self.map_fatal_storage_error(err))?; + self.storage.delete_logs_from(start..).await.map_err(|err| self.map_storage_error(err))?; self.last_log_id = self.get_log_id(start - 1).await?; - let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; + let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?; self.update_membership(membership)?; @@ -231,8 +231,7 @@ impl, S: RaftStorage> Ra return Ok(self.last_applied); } - let entries = - self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_fatal_storage_error(err))?; + let entries = self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_storage_error(err))?; let entry = entries .first() @@ -300,8 +299,7 @@ impl, S: RaftStorage> Ra msg_entries.summary() ); - let entries = - self.storage.get_log_entries(index..end).await.map_err(|err| self.map_fatal_storage_error(err))?; + let entries = self.storage.get_log_entries(index..end).await.map_err(|err| self.map_storage_error(err))?; for (i, ent) in entries.iter().enumerate() { assert_eq!(msg_entries[i].log_id.index, ent.log_id.index); @@ -336,8 +334,7 @@ impl, S: RaftStorage> Ra return Ok(LogId { term: 0, index: 0 }); } - let entries = - self.storage.get_log_entries(start..=start).await.map_err(|err| self.map_fatal_storage_error(err))?; + let entries = self.storage.get_log_entries(start..=start).await.map_err(|err| self.map_storage_error(err))?; let log_id = entries.first().unwrap().log_id; @@ -394,7 +391,7 @@ impl, S: RaftStorage> Ra // Replicate entries to log (same as append, but in follower mode). let entry_refs = entries.iter().collect::>(); - self.storage.append_to_log(&entry_refs).await.map_err(|err| self.map_fatal_storage_error(err))?; + self.storage.append_to_log(&entry_refs).await.map_err(|err| self.map_storage_error(err))?; if let Some(entry) = entries.last() { self.last_log_id = entry.log_id; } @@ -433,7 +430,7 @@ impl, S: RaftStorage> Ra .storage .get_log_entries(self.last_applied.index + 1..=self.commit_index) .await - .map_err(|e| self.map_fatal_storage_error(e))?; + .map_err(|e| self.map_storage_error(e))?; let last_log_id = entries.last().map(|x| x.log_id).unwrap(); @@ -441,10 +438,7 @@ impl, S: RaftStorage> Ra tracing::debug!(?last_log_id); let entries_refs: Vec<_> = entries.iter().collect(); - self.storage - .apply_to_state_machine(&entries_refs) - .await - .map_err(|e| self.map_fatal_storage_error(e))?; + self.storage.apply_to_state_machine(&entries_refs).await.map_err(|e| self.map_storage_error(e))?; self.last_applied = last_log_id; @@ -474,12 +468,12 @@ impl, S: RaftStorage> Ra // Fetch the series of entries which must be applied to the state machine, then apply them. - let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_fatal_storage_error(e))?; + let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_storage_error(e))?; let new_last_applied = entries.last().unwrap(); let data_entries: Vec<_> = entries.iter().collect(); - storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_fatal_storage_error(e))?; + storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_storage_error(e))?; self.last_applied = new_last_applied.log_id; self.report_metrics(Update::Ignore); diff --git a/async-raft/src/core/client.rs b/async-raft/src/core/client.rs index 979693a0c..6fb87dd6c 100644 --- a/async-raft/src/core/client.rs +++ b/async-raft/src/core/client.rs @@ -32,6 +32,7 @@ use crate::LogId; use crate::MessageSummary; use crate::RaftNetwork; use crate::RaftStorage; +use crate::StorageError; /// A wrapper around a ClientRequest which has been transformed into an Entry, along with its response channel. pub(super) struct ClientRequestEntry { @@ -87,8 +88,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage // Thus if a new leader sees only the first one, it needs to append the final config log to let // the change-membership operation to finish. - let last_logs = - self.core.storage.get_log_entries(last_index..=last_index).await.map_err(RaftError::RaftStorage)?; + let last_logs = self + .core + .storage + .get_log_entries(last_index..=last_index) + .await + .map_err(|x| RaftError::RaftStorage(x.into()))?; let last_log = &last_logs[0]; let req = match last_log.payload { @@ -266,11 +271,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage }, payload, }; - self.core - .storage - .append_to_log(&[&entry]) - .await - .map_err(|err| self.core.map_fatal_storage_error(err))?; + self.core.storage.append_to_log(&[&entry]).await.map_err(|err| self.core.map_storage_error(err))?; self.core.last_log_id.index = entry.log_id.index; self.leader_report_metrics(); @@ -436,7 +437,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .storage .get_log_entries(expected_next_index..index) .await - .map_err(|err| self.core.map_fatal_storage_error(err))?; + .map_err(|err| self.core.map_storage_error(err))?; if let Some(entry) = entries.last() { self.core.last_applied = entry.log_id; @@ -448,18 +449,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage .storage .apply_to_state_machine(&data_entries) .await - .map_err(|err| self.core.map_fatal_storage_error(err))?; + .map_err(|err| self.core.map_storage_error(err))?; } } // Apply this entry to the state machine and return its data response. let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| { - if err.downcast_ref::().is_some() { + if let StorageError::IO { .. } = err { // If this is an instance of the storage impl's shutdown error, then trigger shutdown. - self.core.map_fatal_storage_error(err) + self.core.map_storage_error(err) } else { // Else, we propagate normally. - RaftError::RaftStorage(err) + RaftError::RaftStorage(err.into()) } }); diff --git a/async-raft/src/core/install_snapshot.rs b/async-raft/src/core/install_snapshot.rs index a38420c26..891776346 100644 --- a/async-raft/src/core/install_snapshot.rs +++ b/async-raft/src/core/install_snapshot.rs @@ -111,8 +111,7 @@ impl, S: RaftStorage> Ra } // Create a new snapshot and begin writing its contents. - let mut snapshot = - self.storage.begin_receiving_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?; + let mut snapshot = self.storage.begin_receiving_snapshot().await.map_err(|err| self.map_storage_error(err))?; snapshot.as_mut().write_all(&req.data).await?; // If this was a small snapshot, and it is already done, then finish up. @@ -199,7 +198,7 @@ impl, S: RaftStorage> Ra .storage .finalize_snapshot_installation(&req.meta, snapshot) .await - .map_err(|e| self.map_fatal_storage_error(e))?; + .map_err(|e| self.map_storage_error(e))?; tracing::debug!("update after apply or install-snapshot: {:?}", changes); @@ -216,8 +215,7 @@ impl, S: RaftStorage> Ra } // There could be unknown membership in the snapshot. - let membership = - self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?; + let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?; self.update_membership(membership)?; diff --git a/async-raft/src/core/mod.rs b/async-raft/src/core/mod.rs index 40553dcb6..0a56af1c2 100644 --- a/async-raft/src/core/mod.rs +++ b/async-raft/src/core/mod.rs @@ -56,6 +56,7 @@ use crate::MessageSummary; use crate::NodeId; use crate::RaftNetwork; use crate::RaftStorage; +use crate::StorageError; use crate::Update; /// The core type implementing the Raft protocol. @@ -179,7 +180,7 @@ impl, S: RaftStorage> Ra async fn main(mut self) -> RaftResult<()> { tracing::debug!("raft node is initializing"); - let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?; + let state = self.storage.get_initial_state().await.map_err(|err| self.map_storage_error(err))?; self.last_log_id = state.last_log_id; self.current_term = state.hard_state.current_term; self.voted_for = state.hard_state.voted_for; @@ -191,9 +192,7 @@ impl, S: RaftStorage> Ra self.commit_index = 0; // Fetch the most recent snapshot in the system. - if let Some(snapshot) = - self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))? - { + if let Some(snapshot) = self.storage.get_current_snapshot().await.map_err(|err| self.map_storage_error(err))? { self.snapshot_last_log_id = snapshot.meta.last_log_id; self.report_metrics(Update::Ignore); } @@ -286,7 +285,7 @@ impl, S: RaftStorage> Ra current_term: self.current_term, voted_for: self.voted_for, }; - self.storage.save_hard_state(&hs).await.map_err(|err| self.map_fatal_storage_error(err)) + self.storage.save_hard_state(&hs).await.map_err(|err| self.map_storage_error(err)) } /// Update core's target state, ensuring all invariants are upheld. @@ -367,6 +366,12 @@ impl, S: RaftStorage> Ra RaftError::RaftStorage(err) } + fn map_storage_error(&mut self, err: StorageError) -> RaftError { + tracing::error!({error=?err, id=self.id}, "fatal storage error, shutting down"); + self.set_target_state(State::Shutdown); + RaftError::RaftStorage(err.into()) + } + /// Update the node's current membership config & save hard state. #[tracing::instrument(level = "trace", skip(self))] fn update_membership(&mut self, cfg: MembershipConfig) -> RaftResult<()> { diff --git a/async-raft/src/core/replication.rs b/async-raft/src/core/replication.rs index c41627a70..821993866 100644 --- a/async-raft/src/core/replication.rs +++ b/async-raft/src/core/replication.rs @@ -329,12 +329,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage }; // Check for existence of current snapshot. - let current_snapshot_opt = self - .core - .storage - .get_current_snapshot() - .await - .map_err(|err| self.core.map_fatal_storage_error(err))?; + let current_snapshot_opt = + self.core.storage.get_current_snapshot().await.map_err(|err| self.core.map_storage_error(err))?; if let Some(snapshot) = current_snapshot_opt { // If snapshot exists, ensure its distance from the leader's last log index is <= half diff --git a/async-raft/src/error.rs b/async-raft/src/error.rs index ac2b16c5b..9dc5706d7 100644 --- a/async-raft/src/error.rs +++ b/async-raft/src/error.rs @@ -2,8 +2,6 @@ use std::fmt; -use thiserror::Error; - use crate::raft_types::SnapshotSegmentId; use crate::AppData; use crate::NodeId; @@ -12,7 +10,7 @@ use crate::NodeId; pub type RaftResult = std::result::Result; /// Error variants related to the internals of Raft. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum RaftError { // Streaming-snapshot encountered mismatched snapshot_id/offset @@ -21,12 +19,15 @@ pub enum RaftError { expect: SnapshotSegmentId, got: SnapshotSegmentId, }, + /// An error which has come from the `RaftStorage` layer. #[error("{0}")] RaftStorage(anyhow::Error), + /// An error which has come from the `RaftNetwork` layer. #[error("{0}")] RaftNetwork(anyhow::Error), + /// An internal Raft error indicating that Raft is shutting down. #[error("Raft is shutting down")] ShuttingDown, @@ -39,7 +40,7 @@ impl From for RaftError { } /// An error related to a client read request. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum ClientReadError { /// A Raft error. #[error("{0}")] @@ -50,7 +51,7 @@ pub enum ClientReadError { } /// An error related to a client write request. -#[derive(Error)] +#[derive(thiserror::Error)] pub enum ClientWriteError { /// A Raft error. #[error("{0}")] @@ -72,13 +73,14 @@ impl fmt::Debug for ClientWriteError { } /// Error variants related to configuration. -#[derive(Debug, Error, Eq, PartialEq)] +#[derive(Debug, thiserror::Error, Eq, PartialEq)] #[non_exhaustive] pub enum ConfigError { /// A configuration error indicating that the given values for election timeout min & max are invalid: max must be /// greater than min. #[error("given values for election timeout min & max are invalid: max must be greater than min")] InvalidElectionTimeoutMinMax, + /// The given value for max_payload_entries is too small, must be > 0. #[error("the given value for max_payload_entries is too small, must be > 0")] MaxPayloadEntriesTooSmall, @@ -90,19 +92,20 @@ pub enum ConfigError { } /// The set of errors which may take place when initializing a pristine Raft node. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum InitializeError { /// An internal error has taken place. #[error("{0}")] RaftError(#[from] RaftError), + /// The requested action is not allowed due to the Raft node's current state. #[error("the requested action is not allowed due to the Raft node's current state")] NotAllowed, } /// The set of errors which may take place when requesting to propose a config change. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum ChangeConfigError { /// An error related to the processing of the config change request. @@ -111,19 +114,23 @@ pub enum ChangeConfigError { /// to the Raft log and the process related to that workflow. #[error("{0}")] RaftError(#[from] RaftError), + /// The cluster is already undergoing a configuration change. #[error("the cluster is already undergoing a configuration change")] ConfigChangeInProgress, + /// The given config would leave the cluster in an inoperable state. /// /// This error will be returned if the full set of changes, once fully applied, would leave /// the cluster in an inoperable state. #[error("the given config would leave the cluster in an inoperable state")] InoperableConfig, + /// The node the config change proposal was sent to was not the leader of the cluster. The ID /// of the current leader is returned if known. #[error("this node is not the Raft leader")] NodeNotLeader(Option), + /// The proposed config changes would make no difference to the current config. /// /// This takes into account a current joint consensus and the end result of the config. @@ -141,7 +148,7 @@ impl From> for ChangeConfigError { } // A error wrapper of every type of error that will be sent to the caller. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum ResponseError { #[error(transparent)] diff --git a/async-raft/src/lib.rs b/async-raft/src/lib.rs index 8d177bead..7d6d0a63d 100644 --- a/async-raft/src/lib.rs +++ b/async-raft/src/lib.rs @@ -1,4 +1,5 @@ #![doc = include_str!("../README.md")] +#![feature(backtrace)] pub mod config; mod core; @@ -12,6 +13,7 @@ pub mod raft; mod raft_types; mod replication; pub mod storage; +mod storage_error; mod summary; pub use async_trait; @@ -39,6 +41,12 @@ pub use crate::replication::ReplicationMetrics; pub use crate::storage::RaftStorage; pub use crate::storage::RaftStorageDebug; pub use crate::storage::SnapshotMeta; +pub use crate::storage_error::DefensiveError; +pub use crate::storage_error::ErrorSubject; +pub use crate::storage_error::ErrorVerb; +pub use crate::storage_error::StorageError; +pub use crate::storage_error::StorageIOError; +pub use crate::storage_error::Violation; pub use crate::summary::MessageSummary; /// A Raft node's ID. diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 6c1f2a8be..0557f0726 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -1,10 +1,8 @@ //! The Raft storage interface and data types. -use std::error::Error; use std::fmt::Debug; use std::ops::RangeBounds; -use anyhow::Result; use async_trait::async_trait; use serde::Deserialize; use serde::Serialize; @@ -20,8 +18,9 @@ use crate::AppData; use crate::AppDataResponse; use crate::LogId; use crate::NodeId; +use crate::StorageError; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct SnapshotMeta { // Log entries upto which this snapshot includes, inclusive. pub last_log_id: LogId, @@ -106,15 +105,6 @@ where /// for details on where and how this is used. type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static; - /// The error type used to indicate to Raft that shutdown is needed when calling the - /// `apply_entry_to_state_machine` method. - /// - /// This error type is only considered for the `apply_entry_to_state_machine` method as it is - /// the only method which is allowed to return errors normally as part of application logic. - /// - /// For all other methods of this trait, returning an error will cause Raft to shutdown. - type ShutdownError: Error + Send + Sync + 'static; - /// Set if to turn on defensive check to unexpected input. /// E.g. discontinuous log appending. /// The default impl returns `false` to indicate it does impl any defensive check. @@ -135,7 +125,7 @@ where /// the node's ID so that it is consistent across restarts. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_membership_config(&self) -> Result; + async fn get_membership_config(&self) -> Result; /// Get Raft's state information from storage. /// @@ -148,12 +138,12 @@ where /// the node's hard state record; and the index of the last log applied to the state machine. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_initial_state(&self) -> Result; + async fn get_initial_state(&self) -> Result; /// Save Raft's hard-state. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn save_hard_state(&self, hs: &HardState) -> Result<()>; + async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError>; /// Get a series of log entries from storage. /// @@ -163,11 +153,11 @@ where async fn get_log_entries + Clone + Debug + Send + Sync>( &self, range: RNG, - ) -> Result>>; + ) -> Result>, StorageError>; /// Try to get an log entry. /// It does not return an error if in defensive mode and the log entry at `log_index` is not found. - async fn try_get_log_entry(&self, log_index: u64) -> Result>>; + async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError>; /// Returns the last known log id. /// It could be the id of the last entry in log, or the last applied id that is saved in state machine. @@ -186,12 +176,15 @@ where /// /// TODO(xp) test it /// TODO(xp) defensive test about consistency - async fn get_last_log_id(&self) -> Result; + async fn get_last_log_id(&self) -> Result; /// Delete all logs in a `range`. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn delete_logs_from + Clone + Debug + Send + Sync>(&self, range: RNG) -> Result<()>; + async fn delete_logs_from + Clone + Debug + Send + Sync>( + &self, + range: RNG, + ) -> Result<(), StorageError>; /// Append a payload of entries to the log. /// @@ -199,7 +192,7 @@ where /// determine its location to be written in the log. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn append_to_log(&self, entries: &[&Entry]) -> Result<()>; + async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError>; /// Apply the given payload of entries to the state machine. /// @@ -216,15 +209,8 @@ where /// - Deal with EntryPayload::ConfigChange /// - A EntryPayload::SnapshotPointer log should never be seen. /// - /// TODO(xp): choose one of the following policy: - /// Error handling for this method is note worthy. If an error is returned from a call to this - /// method, the error will be inspected, and if the error is an instance of - /// `RaftStorage::ShutdownError`, then Raft will go into shutdown in order to preserve the - /// safety of the data and avoid corruption. Any other errors will be propagated back up to the - /// `Raft.client_write` call point. - /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result>; + async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result, StorageError>; /// Perform log compaction, returning a handle to the generated snapshot. /// @@ -236,7 +222,7 @@ where /// log covered by the snapshot. /// /// Errors returned from this method will be logged and retried. - async fn do_log_compaction(&self) -> Result>; + async fn do_log_compaction(&self) -> Result, StorageError>; /// Create a new blank snapshot, returning a writable handle to the snapshot object. /// @@ -247,7 +233,7 @@ where /// for details on log compaction / snapshotting. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn begin_receiving_snapshot(&self) -> Result>; + async fn begin_receiving_snapshot(&self) -> Result, StorageError>; /// Finalize the installation of a snapshot which has finished streaming from the cluster leader. /// @@ -270,7 +256,7 @@ where &self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result; + ) -> Result; /// Get a readable handle to the current snapshot, along with its metadata. /// @@ -285,7 +271,7 @@ where /// of the snapshot, which should be decoded for creating this method's response data. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn get_current_snapshot(&self) -> Result>>; + async fn get_current_snapshot(&self) -> Result>, StorageError>; } /// APIs for debugging a store. diff --git a/async-raft/src/storage_error.rs b/async-raft/src/storage_error.rs new file mode 100644 index 000000000..7f46dc2a2 --- /dev/null +++ b/async-raft/src/storage_error.rs @@ -0,0 +1,169 @@ +use std::backtrace::Backtrace; +use std::fmt::Formatter; +use std::ops::Bound; + +use crate::storage::HardState; +use crate::LogId; +use crate::SnapshotMeta; + +/// An error that occurs when the RaftStore impl runs defensive check of input or output. +/// E.g. re-applying an log entry is a violation that may be a potential bug. +#[derive(thiserror::Error, Debug)] +pub struct DefensiveError { + /// The subject that violates store defensive check, e.g. hard-state, log or state machine. + pub subject: ErrorSubject, + + /// The description of the violation. + pub violation: Violation, + + pub backtrace: Backtrace, +} + +impl DefensiveError { + pub fn new(subject: ErrorSubject, violation: Violation) -> DefensiveError { + DefensiveError { + subject, + violation, + backtrace: Backtrace::capture(), + } + } +} + +impl std::fmt::Display for DefensiveError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "'{:?}' violates: '{}'", self.subject, self.violation) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum ErrorSubject { + /// A general storage error + Store, + + /// HardState related error. + HardState, + + /// Error that is happened when operating a series of log entries + Logs, + + /// Error about a single log entry + Log(LogId), + + /// Error about a single log entry without knowing the log term. + LogIndex(u64), + + /// Error happened when applying a log entry + Apply(LogId), + + /// Error happened when operating state machine. + StateMachine, + + /// Error happened when operating snapshot. + Snapshot(SnapshotMeta), + + None, +} + +/// What it is doing when an error occurs. +#[derive(Debug)] +pub enum ErrorVerb { + Read, + Write, + Delete, +} + +/// Violations a store would return when running defensive check. +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum Violation { + #[error("term can only be change to a greater value, current: {curr}, change to {to}")] + TermNotAscending { curr: u64, to: u64 }, + + #[error("voted_for can not change from Some() to other Some(), current: {curr:?}, change to {to:?}")] + VotedForChanged { curr: HardState, to: HardState }, + + #[error("log at higher index is obsolete: {higher_index_log_id:?} should GT {lower_index_log_id:?}")] + DirtyLog { + higher_index_log_id: LogId, + lower_index_log_id: LogId, + }, + + #[error("try to get log at index {want} but got {got:?}")] + LogIndexNotFound { want: u64, got: Option }, + + #[error("range is empty: start: {start:?}, end: {end:?}")] + RangeEmpty { start: Option, end: Option }, + + #[error("range is not half-open: start: {start:?}, end: {end:?}")] + RangeNotHalfOpen { start: Bound, end: Bound }, + + #[error("empty log vector")] + LogsEmpty, + + #[error("logs are not consecutive, prev: {prev}, next: {next}")] + LogsNonConsecutive { prev: LogId, next: LogId }, + + #[error("invalid next log to apply: prev: {prev}, next: {next}")] + ApplyNonConsecutive { prev: LogId, next: LogId }, +} + +/// A storage error could be either a defensive check error or an error occurred when doing the actual io operation. +#[derive(Debug, thiserror::Error)] +pub enum StorageError { + /// An error raised by defensive check. + #[error(transparent)] + Defensive { + #[from] + #[backtrace] + source: DefensiveError, + }, + + /// An error raised by io operation. + #[error(transparent)] + IO { + #[from] + #[backtrace] + source: StorageIOError, + }, +} + +impl StorageError { + pub fn into_defensive(self) -> Option { + match self { + StorageError::Defensive { source } => Some(source), + _ => None, + } + } + + pub fn into_io(self) -> Option { + match self { + StorageError::IO { source } => Some(source), + _ => None, + } + } +} + +/// Error that occurs when operating the store. +#[derive(Debug, thiserror::Error)] +pub struct StorageIOError { + subject: ErrorSubject, + verb: ErrorVerb, + source: anyhow::Error, + backtrace: Backtrace, +} + +impl std::fmt::Display for StorageIOError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "when {:?} {:?}: {}", self.verb, self.subject, self.source) + } +} + +impl StorageIOError { + pub fn new(subject: ErrorSubject, verb: ErrorVerb, source: anyhow::Error) -> StorageIOError { + StorageIOError { + subject, + verb, + source, + backtrace: Backtrace::capture(), + } + } +} diff --git a/guide/src/putting-it-all-together.md b/guide/src/putting-it-all-together.md index a994addb0..aa707eb80 100644 --- a/guide/src/putting-it-all-together.md +++ b/guide/src/putting-it-all-together.md @@ -74,7 +74,6 @@ use anyhow::Result; #[async_trait] impl RaftStorage for MemStore { type Snapshot = Cursor>; - type ShutdownError = ShutdownError; async fn get_membership_config(&self) -> Result { // ... snip ... diff --git a/memstore/Cargo.toml b/memstore/Cargo.toml index 45443f3ce..1ba5b508f 100644 --- a/memstore/Cargo.toml +++ b/memstore/Cargo.toml @@ -18,7 +18,6 @@ async-raft = { version="0.6", path="../async-raft" } async-trait = "0.1.36" serde = { version="1.0.114", features=["derive"] } serde_json = "1.0.57" -thiserror = "1.0.20" tokio = { version="1.0", default-features=false, features=["sync"] } tracing = "0.1.17" tracing-futures = "0.2.4" diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 9122ebc5c..858e53025 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -1,4 +1,6 @@ #![doc = include_str!("../README.md")] +#![feature(backtrace)] +#![feature(bound_cloned)] #[cfg(test)] mod test; @@ -13,7 +15,6 @@ use std::ops::RangeBounds; use std::sync::Arc; use std::sync::Mutex; -use anyhow::Result; use async_raft::async_trait::async_trait; use async_raft::raft::Entry; use async_raft::raft::EntryPayload; @@ -23,15 +24,20 @@ use async_raft::storage::InitialState; use async_raft::storage::Snapshot; use async_raft::AppData; use async_raft::AppDataResponse; +use async_raft::DefensiveError; +use async_raft::ErrorSubject; +use async_raft::ErrorVerb; use async_raft::LogId; use async_raft::NodeId; use async_raft::RaftStorage; use async_raft::RaftStorageDebug; use async_raft::SnapshotMeta; use async_raft::StateMachineChanges; +use async_raft::StorageError; +use async_raft::StorageIOError; +use async_raft::Violation; use serde::Deserialize; use serde::Serialize; -use thiserror::Error; use tokio::sync::RwLock; /// The application data request type which the `MemStore` works with. @@ -58,13 +64,6 @@ pub struct ClientResponse(Option); impl AppDataResponse for ClientResponse {} -/// Error used to trigger Raft shutdown from storage. -#[derive(Clone, Debug, Error)] -pub enum ShutdownError { - #[error("unsafe storage error")] - UnsafeStorageError, -} - /// The application snapshot type which the `MemStore` works with. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct MemStoreSnapshot { @@ -153,7 +152,7 @@ impl MemStore { impl MemStore { /// Ensure that logs that have greater index than last_applied should have greater log_id. /// Invariant must hold: `log.log_id.index > last_applied.index` implies `log.log_id > last_applied`. - pub async fn defensive_no_dirty_log(&self) -> anyhow::Result<()> { + pub async fn defensive_no_dirty_log(&self) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -164,7 +163,13 @@ impl MemStore { let last_applied = sm.last_applied_log; if last_log_id.index > last_applied.index && last_log_id < last_applied { - return Err(anyhow::anyhow!("greater index log is smaller than last_applied")); + return Err(DefensiveError::new( + ErrorSubject::Log(last_log_id), + Violation::DirtyLog { + higher_index_log_id: last_log_id, + lower_index_log_id: last_applied, + }, + )); } Ok(()) @@ -172,7 +177,7 @@ impl MemStore { /// Ensure that current_term must increment for every update, and for every term there could be only one value for /// voted_for. - pub async fn defensive_incremental_hard_state(&self, hs: &HardState) -> anyhow::Result<()> { + pub async fn defensive_incremental_hard_state(&self, hs: &HardState) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -180,22 +185,26 @@ impl MemStore { let h = self.hs.write().await; let curr = h.clone().unwrap_or_default(); if hs.current_term < curr.current_term { - return Err(anyhow::anyhow!("smaller term is now allowed")); + return Err(DefensiveError::new( + ErrorSubject::HardState, + Violation::TermNotAscending { + curr: curr.current_term, + to: hs.current_term, + }, + )); } if hs.current_term == curr.current_term && curr.voted_for.is_some() && hs.voted_for != curr.voted_for { - return Err(anyhow::anyhow!( - "voted_for can not change in one term({}) curr: {:?} change to {:?}", - hs.current_term, - curr.voted_for, - hs.voted_for + return Err(DefensiveError::new( + ErrorSubject::HardState, + Violation::VotedForChanged { curr, to: hs.clone() }, )); } Ok(()) } - pub async fn defensive_consecutive_input(&self, entries: &[&Entry]) -> anyhow::Result<()> { + pub async fn defensive_consecutive_input(&self, entries: &[&Entry]) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -208,11 +217,10 @@ impl MemStore { for e in entries.iter().skip(1) { if e.log_id.index != prev_log_id.index + 1 { - return Err(anyhow::anyhow!( - "nonconsecutive input log index: {}, {}", - prev_log_id, - e.log_id - )); + return Err(DefensiveError::new(ErrorSubject::Logs, Violation::LogsNonConsecutive { + prev: prev_log_id, + next: e.log_id, + })); } prev_log_id = e.log_id; @@ -221,13 +229,13 @@ impl MemStore { Ok(()) } - pub async fn defensive_nonempty_input(&self, entries: &[&Entry]) -> anyhow::Result<()> { + pub async fn defensive_nonempty_input(&self, entries: &[&Entry]) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } if entries.is_empty() { - return Err(anyhow::anyhow!("append empty entries")); + return Err(DefensiveError::new(ErrorSubject::Logs, Violation::LogsEmpty)); } Ok(()) @@ -236,7 +244,7 @@ impl MemStore { pub async fn defensive_append_log_index_is_last_plus_one( &self, entries: &[&Entry], - ) -> anyhow::Result<()> { + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -245,17 +253,22 @@ impl MemStore { let first_id = entries[0].log_id; if last_id.index + 1 != first_id.index { - return Err(anyhow::anyhow!( - "first input log index({}) is not last({}) + 1", - first_id.index, - last_id.index, + return Err(DefensiveError::new( + ErrorSubject::Log(first_id), + Violation::LogsNonConsecutive { + prev: last_id, + next: first_id, + }, )); } Ok(()) } - pub async fn defensive_append_log_id_gt_last(&self, entries: &[&Entry]) -> anyhow::Result<()> { + pub async fn defensive_append_log_id_gt_last( + &self, + entries: &[&Entry], + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -264,10 +277,12 @@ impl MemStore { let first_id = entries[0].log_id; if first_id < last_id { - return Err(anyhow::anyhow!( - "first input log id({}) is not > last id({})", - first_id, - last_id, + return Err(DefensiveError::new( + ErrorSubject::Log(first_id), + Violation::LogsNonConsecutive { + prev: last_id, + next: first_id, + }, )); } @@ -287,7 +302,7 @@ impl MemStore { std::cmp::max(log_last_id, sm_last_id) } - pub async fn defensive_consistent_log_sm(&self) -> anyhow::Result<()> { + pub async fn defensive_consistent_log_sm(&self) -> Result<(), DefensiveError> { let log_last_id = { let log_last = self.log.read().await; log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default() @@ -298,10 +313,12 @@ impl MemStore { if (log_last_id.index == sm_last_id.index && log_last_id != sm_last_id) || (log_last_id.index > sm_last_id.index && log_last_id < sm_last_id) { - return Err(anyhow::anyhow!( - "inconsistent log.last({}) and sm.last_applied({})", - log_last_id, - sm_last_id + return Err(DefensiveError::new( + ErrorSubject::Log(log_last_id), + Violation::DirtyLog { + higher_index_log_id: log_last_id, + lower_index_log_id: sm_last_id, + }, )); } @@ -311,7 +328,7 @@ impl MemStore { pub async fn defensive_apply_index_is_last_applied_plus_one( &self, entries: &[&Entry], - ) -> anyhow::Result<()> { + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -320,10 +337,12 @@ impl MemStore { let first_id = entries[0].log_id; if last_id.index + 1 != first_id.index { - return Err(anyhow::anyhow!( - "first input log index({}) is not last({}) + 1", - first_id.index, - last_id.index, + return Err(DefensiveError::new( + ErrorSubject::Apply(first_id), + Violation::ApplyNonConsecutive { + prev: last_id, + next: first_id, + }, )); } @@ -333,7 +352,7 @@ impl MemStore { pub async fn defensive_nonempty_range + Clone + Debug + Send>( &self, range: RNG, - ) -> anyhow::Result<()> { + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -354,7 +373,10 @@ impl MemStore { } if start > end { - return Err(anyhow::anyhow!("range must be nonempty: {:?}", range)); + return Err(DefensiveError::new(ErrorSubject::Logs, Violation::RangeEmpty { + start, + end, + })); } Ok(()) @@ -364,7 +386,7 @@ impl MemStore { pub async fn defensive_half_open_range + Clone + Debug + Send>( &self, range: RNG, - ) -> anyhow::Result<()> { + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -377,14 +399,17 @@ impl MemStore { return Ok(()); }; - Err(anyhow::anyhow!("range must be at least half open: {:?}", range)) + Err(DefensiveError::new(ErrorSubject::Logs, Violation::RangeNotHalfOpen { + start: range.start_bound().cloned(), + end: range.end_bound().cloned(), + })) } pub async fn defensive_range_hits_logs + Debug + Send>( &self, range: RNG, logs: &[Entry], - ) -> anyhow::Result<()> { + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -399,11 +424,12 @@ impl MemStore { let first = logs.first().map(|x| x.log_id.index); if want_first.is_some() && first != want_first { - return Err(anyhow::anyhow!( - "{:?} want first: {:?}, but {:?}", - range, - want_first, - first + return Err(DefensiveError::new( + ErrorSubject::LogIndex(want_first.unwrap()), + Violation::LogIndexNotFound { + want: want_first.unwrap(), + got: first, + }, )); } } @@ -418,11 +444,12 @@ impl MemStore { let last = logs.last().map(|x| x.log_id.index); if want_last.is_some() && last != want_last { - return Err(anyhow::anyhow!( - "{:?} want last: {:?}, but {:?}", - range, - want_last, - last + return Err(DefensiveError::new( + ErrorSubject::LogIndex(want_last.unwrap()), + Violation::LogIndexNotFound { + want: want_last.unwrap(), + got: last, + }, )); } } @@ -430,7 +457,10 @@ impl MemStore { Ok(()) } - pub async fn defensive_apply_log_id_gt_last(&self, entries: &[&Entry]) -> anyhow::Result<()> { + pub async fn defensive_apply_log_id_gt_last( + &self, + entries: &[&Entry], + ) -> Result<(), DefensiveError> { if !*self.defensive.read().await { return Ok(()); } @@ -439,10 +469,12 @@ impl MemStore { let first_id = entries[0].log_id; if first_id < last_id { - return Err(anyhow::anyhow!( - "first input log id({}) is not > last id({})", - first_id, - last_id, + return Err(DefensiveError::new( + ErrorSubject::Apply(first_id), + Violation::ApplyNonConsecutive { + prev: last_id, + next: first_id, + }, )); } @@ -477,7 +509,7 @@ impl MemStore { /// Go backwards through the log to find the most recent membership config <= `upto_index`. #[tracing::instrument(level = "trace", skip(self))] - pub async fn get_membership_from_log(&self, upto_index: Option) -> Result { + pub async fn get_membership_from_log(&self, upto_index: Option) -> Result { self.defensive_no_dirty_log().await?; let membership = { @@ -523,7 +555,6 @@ impl MemStore { #[async_trait] impl RaftStorage for MemStore { type SnapshotData = Cursor>; - type ShutdownError = ShutdownError; async fn defensive(&self, d: bool) -> bool { let mut defensive_flag = self.defensive.write().await; @@ -532,12 +563,12 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_membership_config(&self) -> Result { + async fn get_membership_config(&self) -> Result { self.get_membership_from_log(None).await } #[tracing::instrument(level = "trace", skip(self))] - async fn get_initial_state(&self) -> Result { + async fn get_initial_state(&self) -> Result { self.defensive_no_dirty_log().await?; let membership = self.get_membership_config().await?; @@ -576,7 +607,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "debug", skip(self))] - async fn save_hard_state(&self, hs: &HardState) -> Result<()> { + async fn save_hard_state(&self, hs: &HardState) -> Result<(), StorageError> { self.defensive_incremental_hard_state(hs).await?; let mut h = self.hs.write().await; @@ -589,7 +620,7 @@ impl RaftStorage for MemStore { async fn get_log_entries + Clone + Debug + Send + Sync>( &self, range: RNG, - ) -> Result>> { + ) -> Result>, StorageError> { self.defensive_nonempty_range(range.clone()).await?; let res = { @@ -603,14 +634,15 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn try_get_log_entry(&self, log_index: u64) -> Result>> { + async fn try_get_log_entry(&self, log_index: u64) -> Result>, StorageError> { let log = self.log.read().await; Ok(log.get(&log_index).cloned()) } #[tracing::instrument(level = "trace", skip(self))] - async fn get_last_log_id(&self) -> Result { + async fn get_last_log_id(&self) -> Result { self.defensive_consistent_log_sm().await?; + // TODO: log id must consistent: let log_last_id = self.log.read().await.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default(); let last_applied_id = self.sm.read().await.last_applied_log; @@ -619,7 +651,10 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))] - async fn delete_logs_from + Clone + Debug + Send + Sync>(&self, range: R) -> Result<()> { + async fn delete_logs_from + Clone + Debug + Send + Sync>( + &self, + range: R, + ) -> Result<(), StorageError> { self.defensive_nonempty_range(range.clone()).await?; self.defensive_half_open_range(range.clone()).await?; @@ -634,7 +669,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn append_to_log(&self, entries: &[&Entry]) -> Result<()> { + async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError> { self.defensive_nonempty_input(entries).await?; self.defensive_consecutive_input(entries).await?; self.defensive_append_log_index_is_last_plus_one(entries).await?; @@ -648,7 +683,10 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self, entries))] - async fn apply_to_state_machine(&self, entries: &[&Entry]) -> Result> { + async fn apply_to_state_machine( + &self, + entries: &[&Entry], + ) -> Result, StorageError> { self.defensive_nonempty_input(entries).await?; self.defensive_apply_index_is_last_applied_plus_one(entries).await?; self.defensive_apply_log_id_gt_last(entries).await?; @@ -685,13 +723,14 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn do_log_compaction(&self) -> Result> { + async fn do_log_compaction(&self) -> Result, StorageError> { let (data, last_applied_log); let membership_config; { // Serialize the data of the state machine. let sm = self.sm.read().await; - data = serde_json::to_vec(&*sm)?; + data = serde_json::to_vec(&*sm) + .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, e.into()))?; last_applied_log = sm.last_applied_log; membership_config = sm.last_membership.clone().unwrap_or_else(|| MembershipConfig::new_initial(self.id)); } // Release state machine read lock. @@ -736,7 +775,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn begin_receiving_snapshot(&self) -> Result> { + async fn begin_receiving_snapshot(&self) -> Result, StorageError> { Ok(Box::new(Cursor::new(Vec::new()))) } @@ -745,7 +784,7 @@ impl RaftStorage for MemStore { &self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result { + ) -> Result { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -774,7 +813,13 @@ impl RaftStorage for MemStore { // Update the state machine. { - let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data)?; + let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { + StorageIOError::new( + ErrorSubject::Snapshot(new_snapshot.meta.clone()), + ErrorVerb::Read, + e.into(), + ) + })?; let mut sm = self.sm.write().await; *sm = new_sm; } @@ -789,7 +834,7 @@ impl RaftStorage for MemStore { } #[tracing::instrument(level = "trace", skip(self))] - async fn get_current_snapshot(&self) -> Result>> { + async fn get_current_snapshot(&self) -> Result>, StorageError> { match &*self.current_snapshot.read().await { Some(snapshot) => { // TODO(xp): try not to clone the entire data. diff --git a/memstore/src/test.rs b/memstore/src/test.rs index a42d61026..d305cfc20 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -61,14 +61,14 @@ where } #[test] -pub fn test_mem_store() -> Result<()> { +pub fn test_mem_store() -> anyhow::Result<()> { Suite::test_store(&MemStoreBuilder {})?; Ok(()) } #[test] -pub fn test_mem_store_defensive() -> Result<()> { +pub fn test_mem_store_defensive() -> anyhow::Result<()> { Suite::test_store_defensive(&DefensiveBuilder { inner: MemStoreBuilder {}, d: std::marker::PhantomData, @@ -81,7 +81,7 @@ pub fn test_mem_store_defensive() -> Result<()> { /// Block until a future is finished. /// The future will be running in a clean tokio runtime, to prevent an unfinished task affecting the test. -pub fn run_fut(f: F) -> Result<()> +pub fn run_fut(f: F) -> anyhow::Result<()> where F: Future> { let rt = tokio::runtime::Runtime::new()?; rt.block_on(f)?; @@ -103,7 +103,7 @@ where S: RaftStorageDebug + RaftStorage, B: StoreBuilder, { - fn test_store(builder: &B) -> Result<()> { + fn test_store(builder: &B) -> anyhow::Result<()> { run_fut(Suite::get_membership_config_default(builder))?; run_fut(Suite::get_membership_config_from_log_and_sm(builder))?; run_fut(Suite::get_initial_state_default(builder))?; @@ -123,7 +123,7 @@ where Ok(()) } - pub async fn get_membership_config_default(builder: &B) -> Result<()> { + pub async fn get_membership_config_default(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let membership = store.get_membership_config().await?; @@ -139,7 +139,7 @@ where Ok(()) } - pub async fn get_membership_config_from_log_and_sm(builder: &B) -> Result<()> { + pub async fn get_membership_config_from_log_and_sm(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; tracing::info!("--- no log, read membership from state machine"); @@ -226,7 +226,7 @@ where Ok(()) } - pub async fn get_initial_state_default(builder: &B) -> Result<()> { + pub async fn get_initial_state_default(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let expected_hs = HardState { @@ -262,7 +262,7 @@ where Ok(()) } - pub async fn get_initial_state_with_state(builder: &B) -> Result<()> { + pub async fn get_initial_state_with_state(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; @@ -303,7 +303,7 @@ where Ok(()) } - pub async fn get_initial_state_membership_from_log_and_sm(builder: &B) -> Result<()> { + pub async fn get_initial_state_membership_from_log_and_sm(builder: &B) -> anyhow::Result<()> { // It should never return membership from logs that are included in state machine present. let store = builder.new_store(NODE_ID).await; @@ -395,7 +395,7 @@ where Ok(()) } - pub async fn get_initial_state_last_log_gt_sm(builder: &B) -> Result<()> { + pub async fn get_initial_state_last_log_gt_sm(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; @@ -429,7 +429,7 @@ where Ok(()) } - pub async fn get_initial_state_last_log_lt_sm(builder: &B) -> Result<()> { + pub async fn get_initial_state_last_log_lt_sm(builder: &B) -> anyhow::Result<()> { // TODO(xp): check membership: read from log first, then state machine then default. let store = builder.new_store(NODE_ID).await; Self::default_hard_state(&store).await?; @@ -458,7 +458,7 @@ where Ok(()) } - pub async fn save_hard_state(builder: &B) -> Result<()> { + pub async fn save_hard_state(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; store @@ -480,7 +480,7 @@ where Ok(()) } - pub async fn get_log_entries(builder: &B) -> Result<()> { + pub async fn get_log_entries(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; @@ -502,7 +502,7 @@ where Ok(()) } - pub async fn try_get_log_entry(builder: &B) -> Result<()> { + pub async fn try_get_log_entry(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; @@ -518,7 +518,7 @@ where Ok(()) } - pub async fn get_last_log_id(builder: &B) -> Result<()> { + pub async fn get_last_log_id(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let log_id = store.get_last_log_id().await?; @@ -590,7 +590,7 @@ where Ok(()) } - pub async fn delete_logs_from(builder: &B) -> Result<()> { + pub async fn delete_logs_from(builder: &B) -> anyhow::Result<()> { tracing::info!("--- delete start == stop"); { let store = builder.new_store(NODE_ID).await; @@ -639,7 +639,7 @@ where Ok(()) } - pub async fn append_to_log(builder: &B) -> Result<()> { + pub async fn append_to_log(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; @@ -658,7 +658,7 @@ where Ok(()) } - pub async fn apply_single(builder: &B) -> Result<()> { + pub async fn apply_single(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let entry = Entry { @@ -696,7 +696,7 @@ where Ok(()) } - pub async fn apply_multi(builder: &B) -> Result<()> { + pub async fn apply_multi(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let req0 = ClientRequest { @@ -803,7 +803,7 @@ where S: RaftStorageDebug + RaftStorage, B: StoreBuilder, { - fn test_store_defensive(builder: &B) -> Result<()> { + fn test_store_defensive(builder: &B) -> anyhow::Result<()> { run_fut(Suite::df_get_membership_config_dirty_log(builder))?; run_fut(Suite::df_get_initial_state_dirty_log(builder))?; run_fut(Suite::df_save_hard_state_ascending(builder))?; @@ -823,7 +823,7 @@ where Ok(()) } - pub async fn df_get_membership_config_dirty_log(builder: &B) -> Result<()> { + pub async fn df_get_membership_config_dirty_log(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); @@ -867,14 +867,23 @@ where ]) .await?; - let mem = store.get_membership_config().await; - assert!(mem.is_err()); + let res = store.get_membership_config().await; + + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::Log(LogId { term: 1, index: 3 }), + violation: Violation::DirtyLog { + higher_index_log_id: LogId { term: 1, index: 3 }, + lower_index_log_id: LogId { term: 2, index: 2 }, + }, + .. + })) } Ok(()) } - pub async fn df_get_initial_state_dirty_log(builder: &B) -> Result<()> { + pub async fn df_get_initial_state_dirty_log(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; tracing::info!("--- dirty log: log.index > last_applied.index && log < last_applied"); @@ -920,13 +929,22 @@ where .await?; let state = store.get_initial_state().await; - assert!(state.is_err()); + let e = state.unwrap_err().into_defensive().unwrap(); + + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::Log(LogId { term: 1, index: 3 }), + violation: Violation::DirtyLog { + higher_index_log_id: LogId { term: 1, index: 3 }, + lower_index_log_id: LogId { term: 2, index: 2 }, + }, + .. + })) } Ok(()) } - pub async fn df_save_hard_state_ascending(builder: &B) -> Result<()> { + pub async fn df_save_hard_state_ascending(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; store @@ -945,7 +963,12 @@ where }) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::HardState, + violation: Violation::TermNotAscending { curr: 10, to: 9 }, + .. + })); let state = store.get_initial_state().await?; @@ -967,7 +990,21 @@ where }) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::HardState, + violation: Violation::VotedForChanged { + curr: HardState { + current_term: 10, + voted_for: Some(NODE_ID) + }, + to: HardState { + current_term: 10, + voted_for: None + } + }, + .. + })); let state = store.get_initial_state().await?; @@ -989,7 +1026,21 @@ where }) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::HardState, + violation: Violation::VotedForChanged { + curr: HardState { + current_term: 10, + voted_for: Some(NODE_ID) + }, + to: HardState { + current_term: 10, + voted_for: Some(1000) + } + }, + .. + })); let state = store.get_initial_state().await?; @@ -1005,7 +1056,7 @@ where Ok(()) } - pub async fn df_get_log_entries(builder: &B) -> Result<()> { + pub async fn df_get_log_entries(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; @@ -1017,18 +1068,56 @@ where // mismatched bound. let res = store.get_log_entries(11..).await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::LogIndex(11), + violation: Violation::LogIndexNotFound { want: 11, got: None }, + .. + })); let res = store.get_log_entries(1..1).await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::Logs, + violation: Violation::RangeEmpty { + start: Some(1), + end: Some(0) + }, + .. + })); let res = store.get_log_entries(0..1).await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::LogIndex(0), + violation: Violation::LogIndexNotFound { want: 0, got: None }, + .. + })); + + let res = store.get_log_entries(0..2).await; + let e = res.unwrap_err().into_defensive().unwrap(); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::LogIndex(0), + violation: Violation::LogIndexNotFound { want: 0, got: Some(1) }, + .. + })); + + let res = store.get_log_entries(10..12).await; + let e = res.unwrap_err().into_defensive().unwrap(); + println!("{}", e); + assert!(matches!(e, DefensiveError { + subject: ErrorSubject::LogIndex(11), + violation: Violation::LogIndexNotFound { + want: 11, + got: Some(10) + }, + .. + })); Ok(()) } - pub async fn df_get_last_log_id(builder: &B) -> Result<()> { + pub async fn df_get_last_log_id(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; tracing::info!("--- last log_id.index == last_applied.index"); @@ -1048,7 +1137,16 @@ where .await?; let res = store.get_last_log_id().await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 1, index: 1 }), e.subject); + assert_eq!( + Violation::DirtyLog { + higher_index_log_id: LogId { term: 1, index: 1 }, + lower_index_log_id: LogId { term: 2, index: 1 } + }, + e.violation + ); } tracing::info!("--- last log_id.index > last_applied.index => last log_id > last_applied"); @@ -1063,35 +1161,65 @@ where store.defensive(true).await; let res = store.get_last_log_id().await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 1, index: 2 }), e.subject); + assert_eq!( + Violation::DirtyLog { + higher_index_log_id: LogId { term: 1, index: 2 }, + lower_index_log_id: LogId { term: 2, index: 1 } + }, + e.violation + ); } Ok(()) } - pub async fn df_delete_logs_from_nonempty_range(builder: &B) -> Result<()> { + pub async fn df_delete_logs_from_nonempty_range(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?; let res = store.delete_logs_from(10..10).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Logs, e.subject); + assert_eq!( + Violation::RangeEmpty { + start: Some(10), + end: Some(9), + }, + e.violation + ); let res = store.delete_logs_from(1..5).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Logs, e.subject); + assert_eq!( + Violation::RangeNotHalfOpen { + start: Bound::Included(1), + end: Bound::Excluded(5), + }, + e.violation + ); Ok(()) } - pub async fn df_append_to_log_nonempty_input(builder: &B) -> Result<()> { + pub async fn df_append_to_log_nonempty_input(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let res = store.append_to_log(Vec::<&Entry<_>>::new().as_slice()).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Logs, e.subject); + assert_eq!(Violation::LogsEmpty, e.violation); Ok(()) } - pub async fn df_append_to_log_nonconsecutive_input(builder: &B) -> Result<()> { + pub async fn df_append_to_log_nonconsecutive_input(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let res = store @@ -1106,12 +1234,21 @@ where }, ]) .await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Logs, e.subject); + assert_eq!( + Violation::LogsNonConsecutive { + prev: LogId { term: 1, index: 1 }, + next: LogId { term: 1, index: 3 }, + }, + e.violation + ); Ok(()) } - pub async fn df_append_to_log_eq_last_plus_one(builder: &B) -> Result<()> { + pub async fn df_append_to_log_eq_last_plus_one(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; tracing::info!("-- log_id <= last_applied"); @@ -1145,12 +1282,20 @@ where }]) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 3, index: 4 }), e.subject); + assert_eq!( + Violation::LogsNonConsecutive { + prev: LogId { term: 1, index: 2 }, + next: LogId { term: 3, index: 4 }, + }, + e.violation + ); Ok(()) } - pub async fn df_append_to_log_eq_last_applied_plus_one(builder: &B) -> Result<()> { + pub async fn df_append_to_log_eq_last_applied_plus_one(builder: &B) -> anyhow::Result<()> { // last_log: 1,1 // last_applied: 1,2 // append_to_log: 1,4 @@ -1195,12 +1340,20 @@ where }]) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 1, index: 4 }), e.subject); + assert_eq!( + Violation::LogsNonConsecutive { + prev: LogId { term: 1, index: 2 }, + next: LogId { term: 1, index: 4 }, + }, + e.violation + ); Ok(()) } - pub async fn df_append_to_log_gt_last_log_id(builder: &B) -> Result<()> { + pub async fn df_append_to_log_gt_last_log_id(builder: &B) -> anyhow::Result<()> { // last_log: 2,2 // append_to_log: 1,3: index == last + 1 but term is lower let store = builder.new_store(NODE_ID).await; @@ -1225,12 +1378,20 @@ where }]) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 1, index: 3 }), e.subject); + assert_eq!( + Violation::LogsNonConsecutive { + prev: LogId { term: 2, index: 2 }, + next: LogId { term: 1, index: 3 }, + }, + e.violation + ); Ok(()) } - pub async fn df_append_to_log_gt_last_applied_id(builder: &B) -> Result<()> { + pub async fn df_append_to_log_gt_last_applied_id(builder: &B) -> anyhow::Result<()> { // last_log: 2,1 // last_applied: 2,2 // append_to_log: 1,3: index == last + 1 but term is lower @@ -1271,21 +1432,32 @@ where }]) .await; - assert!(res.is_err()); + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Log(LogId { term: 1, index: 3 }), e.subject); + assert_eq!( + Violation::LogsNonConsecutive { + prev: LogId { term: 2, index: 2 }, + next: LogId { term: 1, index: 3 }, + }, + e.violation + ); Ok(()) } - pub async fn df_apply_nonempty_input(builder: &B) -> Result<()> { + pub async fn df_apply_nonempty_input(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let res = store.apply_to_state_machine(Vec::<&Entry<_>>::new().as_slice()).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Logs, e.subject); + assert_eq!(Violation::LogsEmpty, e.violation); Ok(()) } - pub async fn df_apply_index_eq_last_applied_plus_one(builder: &B) -> Result<()> { + pub async fn df_apply_index_eq_last_applied_plus_one(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let entry = Entry { @@ -1305,7 +1477,16 @@ where tracing::info!("--- re-apply 1th"); { let res = store.apply_to_state_machine(&[&entry]).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Apply(LogId { term: 3, index: 1 }), e.subject); + assert_eq!( + Violation::ApplyNonConsecutive { + prev: LogId { term: 3, index: 1 }, + next: LogId { term: 3, index: 1 }, + }, + e.violation + ); } tracing::info!("--- apply 3rd when there is only 1st"); @@ -1322,13 +1503,22 @@ where }), }; let res = store.apply_to_state_machine(&[&entry]).await; - assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Apply(LogId { term: 3, index: 3 }), e.subject); + assert_eq!( + Violation::ApplyNonConsecutive { + prev: LogId { term: 3, index: 1 }, + next: LogId { term: 3, index: 3 }, + }, + e.violation + ); } Ok(()) } - pub async fn df_apply_gt_last_applied_id(builder: &B) -> Result<()> { + pub async fn df_apply_gt_last_applied_id(builder: &B) -> anyhow::Result<()> { let store = builder.new_store(NODE_ID).await; let entry = Entry { @@ -1346,6 +1536,16 @@ where }; let res = store.apply_to_state_machine(&[&entry]).await; assert!(res.is_err()); + + let e = res.unwrap_err().into_defensive().unwrap(); + assert_eq!(ErrorSubject::Apply(LogId { term: 2, index: 2 }), e.subject); + assert_eq!( + Violation::ApplyNonConsecutive { + prev: LogId { term: 3, index: 1 }, + next: LogId { term: 2, index: 2 }, + }, + e.violation + ); } Ok(())