From 26b9e06dc7c2e91c01304a03f3f5c52ad90f8973 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Wed, 26 Jul 2023 11:22:38 +0800 Subject: [PATCH 1/9] refactor: break down the dependency between CurpServer and CurpClient Signed-off-by: Phoeniix Zhao --- curp-external-api/src/snapshot.rs | 3 ++- curp/src/log_entry.rs | 3 +-- curp/src/rpc/mod.rs | 5 ++--- curp/src/server/cmd_worker/mod.rs | 10 +++++----- curp/src/server/curp_node.rs | 9 +-------- curp/src/server/gc.rs | 2 +- curp/src/server/mod.rs | 12 +----------- curp/src/server/raw_curp/log.rs | 2 +- curp/src/server/raw_curp/mod.rs | 13 +------------ curp/src/server/raw_curp/tests.rs | 2 +- curp/src/snapshot.rs | 5 ++--- simulation/tests/it/curp/server_election.rs | 3 +-- simulation/tests/it/curp/server_recovery.rs | 3 +-- xline-client/src/types/lease.rs | 2 +- xline-client/tests/watch.rs | 2 ++ xline/src/server/xline_server.rs | 18 +++++++++++++++--- xline/tests/auth_test.rs | 1 - xline/tests/kv_test.rs | 1 - xline/tests/lease_test.rs | 1 - xline/tests/watch_test.rs | 1 - xlineapi/src/lib.rs | 2 +- 21 files changed, 39 insertions(+), 61 deletions(-) diff --git a/curp-external-api/src/snapshot.rs b/curp-external-api/src/snapshot.rs index 8b21ecdf2..1f653041a 100644 --- a/curp-external-api/src/snapshot.rs +++ b/curp-external-api/src/snapshot.rs @@ -1,6 +1,7 @@ +use std::error::Error; + use async_trait::async_trait; use engine::Snapshot as EngineSnapshot; -use std::error::Error; /// The snapshot allocation is handled by the upper-level application #[allow(clippy::module_name_repetitions)] // it's re-exported in lib diff --git a/curp/src/log_entry.rs b/curp/src/log_entry.rs index 5a5155d0e..af6a7517c 100644 --- a/curp/src/log_entry.rs +++ b/curp/src/log_entry.rs @@ -1,8 +1,7 @@ use std::sync::Arc; -use serde::{Deserialize, Serialize}; - use curp_external_api::LogIndex; +use serde::{Deserialize, Serialize}; /// Log entry #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index fb837d2bd..ac4a475d4 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use serde::{de::DeserializeOwned, Serialize}; -pub use self::proto::protocol_server::ProtocolServer; pub(crate) use self::proto::{ fetch_read_state_response::ReadState, propose_response::ExeResult, @@ -13,8 +12,8 @@ pub(crate) use self::proto::{ WaitSyncedRequest, WaitSyncedResponse, }; pub use self::proto::{ - propose_response, protocol_client, FetchLeaderRequest, FetchLeaderResponse, ProposeRequest, - ProposeResponse, + propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest, + FetchLeaderResponse, ProposeRequest, ProposeResponse, }; use crate::{ cmd::{Command, ProposeId}, diff --git a/curp/src/server/cmd_worker/mod.rs b/curp/src/server/cmd_worker/mod.rs index 5a24ada1f..edb13b690 100644 --- a/curp/src/server/cmd_worker/mod.rs +++ b/curp/src/server/cmd_worker/mod.rs @@ -265,16 +265,16 @@ pub(super) fn start_cmd_workers< mod tests { use std::time::Duration; - use tokio::{sync::mpsc, time::Instant}; - use tracing_test::traced_test; - - use super::*; - use crate::log_entry::LogEntry; use curp_test_utils::{ mock_role_change, sleep_millis, sleep_secs, test_cmd::{TestCE, TestCommand}, }; use test_macros::abort_on_panic; + use tokio::{sync::mpsc, time::Instant}; + use tracing_test::traced_test; + + use super::*; + use crate::log_entry::LogEntry; // This should happen in fast path in most cases #[traced_test] diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 0c03c5e27..54caac77b 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -13,7 +13,7 @@ use tokio::{ time::MissedTickBehavior, }; use tracing::{debug, error, info, warn}; -use utils::config::{ClientTimeout, CurpConfig}; +use utils::config::CurpConfig; use super::{ cmd_board::{CmdBoardRef, CommandBoard}, @@ -24,7 +24,6 @@ use super::{ storage::{StorageApi, StorageError}, }; use crate::{ - client::Client, cmd::{Command, CommandExecutor}, error::RpcError, log_entry::LogEntry, @@ -274,12 +273,6 @@ impl CurpNode { /// Spawned tasks impl CurpNode { - /// get curp inner client from `CurpNode` - #[inline] - pub(crate) async fn inner_client(&self, client_timeout: ClientTimeout) -> Client { - self.curp.inner_client(client_timeout).await - } - /// Tick periodically async fn election_task( curp: Arc>, diff --git a/curp/src/server/gc.rs b/curp/src/server/gc.rs index 091b7908a..11a036e67 100644 --- a/curp/src/server/gc.rs +++ b/curp/src/server/gc.rs @@ -72,6 +72,7 @@ async fn gc_cmd_board(cmd_board: CmdBoardRef, interval: mod tests { use std::{sync::Arc, time::Duration}; + use curp_test_utils::{sleep_secs, test_cmd::TestCommand}; use parking_lot::{Mutex, RwLock}; use test_macros::abort_on_panic; @@ -84,7 +85,6 @@ mod tests { spec_pool::{SpecPoolRef, SpeculativePool}, }, }; - use curp_test_utils::{sleep_secs, test_cmd::TestCommand}; #[tokio::test] #[abort_on_panic] diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index 22e1c7fde..7d2879416 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -9,14 +9,10 @@ use tokio_stream::wrappers::TcpListenerStream; #[cfg(not(madsim))] use tracing::info; use tracing::instrument; -use utils::{ - config::{ClientTimeout, CurpConfig}, - tracing::Extract, -}; +use utils::{config::CurpConfig, tracing::Extract}; use self::curp_node::{CurpError, CurpNode}; use crate::{ - client::Client, cmd::{Command, CommandExecutor}, error::ServerError, members::ClusterMember, @@ -303,12 +299,6 @@ impl Rpc { pub fn leader_rx(&self) -> broadcast::Receiver> { self.inner.leader_rx() } - - /// Get an inner client - #[inline] - pub async fn inner_client(&self, client_timeout: ClientTimeout) -> Client { - self.inner.inner_client(client_timeout).await - } } impl From for tonic::Status { diff --git a/curp/src/server/raw_curp/log.rs b/curp/src/server/raw_curp/log.rs index c89a457d4..3a4e3a38d 100644 --- a/curp/src/server/raw_curp/log.rs +++ b/curp/src/server/raw_curp/log.rs @@ -372,10 +372,10 @@ impl Log { mod tests { use std::{iter::repeat, ops::Index, sync::Arc}; + use curp_test_utils::test_cmd::TestCommand; use utils::config::{default_batch_max_size, default_log_entries_cap}; use super::*; - use curp_test_utils::test_cmd::TestCommand; // impl index for test is handy impl Index for Log { diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index c75a7be82..bf979b7ef 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -29,7 +29,7 @@ use tracing::{ log::{log_enabled, Level}, }; use utils::{ - config::{ClientTimeout, CurpConfig}, + config::CurpConfig, parking_lot_lock::{MutexMap, RwLockMap}, }; @@ -39,7 +39,6 @@ use self::{ }; use super::cmd_worker::CEEventTxApi; use crate::{ - client::Client, cmd::{Command, ProposeId}, error::ProposeError, log_entry::LogEntry, @@ -627,16 +626,6 @@ impl RawCurp { raw_curp } - /// get curp inner client - pub(crate) async fn inner_client(&self, timeout: ClientTimeout) -> Client { - Client::::new( - Some(self.id().clone()), - self.ctx.cluster_info.all_members(), - timeout, - ) - .await - } - /// Create a new `RawCurp` /// `is_leader` will only take effect when all servers start from a fresh state #[allow(clippy::too_many_arguments)] // only called once diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 0154efd17..780073379 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -1,5 +1,6 @@ use std::time::Instant; +use curp_test_utils::{mock_role_change, test_cmd::TestCommand}; use test_macros::abort_on_panic; use tokio::{sync::oneshot, time::sleep}; use tracing_test::traced_test; @@ -18,7 +19,6 @@ use crate::{ }, LogIndex, }; -use curp_test_utils::{mock_role_change, test_cmd::TestCommand}; // Hooks for tests impl RawCurp { diff --git a/curp/src/snapshot.rs b/curp/src/snapshot.rs index 1926976bf..c451306ac 100644 --- a/curp/src/snapshot.rs +++ b/curp/src/snapshot.rs @@ -1,8 +1,7 @@ -#[allow(clippy::module_name_repetitions)] // it's re-exported in lib -pub use curp_external_api::snapshot::SnapshotAllocator; - use std::fmt::Debug; +#[allow(clippy::module_name_repetitions)] // it's re-exported in lib +pub use curp_external_api::snapshot::SnapshotAllocator; use engine::Snapshot as EngineSnapshot; /// Snapshot diff --git a/simulation/tests/it/curp/server_election.rs b/simulation/tests/it/curp/server_election.rs index cd659cbf6..5b47a8969 100644 --- a/simulation/tests/it/curp/server_election.rs +++ b/simulation/tests/it/curp/server_election.rs @@ -1,7 +1,6 @@ use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand}; -use utils::config::ClientTimeout; - use simulation::curp_group::CurpGroup; +use utils::config::ClientTimeout; /// Wait some time for the election to finish, and get the leader to ensure that the election is /// completed. diff --git a/simulation/tests/it/curp/server_recovery.rs b/simulation/tests/it/curp/server_recovery.rs index 4ed9ae228..de9b55762 100644 --- a/simulation/tests/it/curp/server_recovery.rs +++ b/simulation/tests/it/curp/server_recovery.rs @@ -5,11 +5,10 @@ use std::sync::Arc; use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand, TEST_TABLE}; use engine::StorageEngine; use itertools::Itertools; +use simulation::curp_group::{CurpGroup, ProposeRequest}; use tracing::debug; use utils::config::ClientTimeout; -use simulation::curp_group::{CurpGroup, ProposeRequest}; - #[madsim::test] async fn leader_crash_and_recovery() { init_logger(); diff --git a/xline-client/src/types/lease.rs b/xline-client/src/types/lease.rs index 04e41ee98..4729c7c83 100644 --- a/xline-client/src/types/lease.rs +++ b/xline-client/src/types/lease.rs @@ -1,10 +1,10 @@ -use crate::error::{ClientError, Result}; use futures::channel::mpsc::Sender; pub use xlineapi::{ LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse, LeaseStatus, LeaseTimeToLiveResponse, }; +use crate::error::{ClientError, Result}; /// The lease keep alive handle. #[derive(Debug)] diff --git a/xline-client/tests/watch.rs b/xline-client/tests/watch.rs index 788c66fa0..03efedec3 100644 --- a/xline-client/tests/watch.rs +++ b/xline-client/tests/watch.rs @@ -6,6 +6,8 @@ use xline_client::{ types::watch::{EventType, WatchRequest}, }; +use crate::common::get_cluster_client; + mod common; #[tokio::test] diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 85e7ea101..8fc76ef60 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -2,7 +2,9 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::Result; use clippy_utilities::{Cast, OverflowArithmetic}; -use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator}; +use curp::{ + client::Client, members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator, +}; use event_listener::Event; use jsonwebtoken::{DecodingKey, EncodingKey}; use tokio::{net::TcpListener, sync::mpsc::channel}; @@ -44,6 +46,9 @@ use crate::{ /// Rpc Server of curp protocol type CurpServer = Rpc>; +/// Rpc Client of curp protocol +type CurpClient = Client; + /// Xline server #[derive(Debug)] pub struct XlineServer { @@ -314,6 +319,15 @@ impl XlineServer { _ => unimplemented!(), }; + let client = Arc::new( + CurpClient::new( + Some(self.cluster_info.self_id().clone()), + self.cluster_info.all_members(), + self.client_timeout, + ) + .await, + ); + let curp_server = CurpServer::new( Arc::clone(&self.cluster_info), self.is_leader, @@ -324,8 +338,6 @@ impl XlineServer { ) .await; - let client = Arc::new(curp_server.inner_client(self.client_timeout).await); - Ok(( KvServer::new( kv_storage, diff --git a/xline/tests/auth_test.rs b/xline/tests/auth_test.rs index 47c4b4f4f..0b0bcca5e 100644 --- a/xline/tests/auth_test.rs +++ b/xline/tests/auth_test.rs @@ -3,7 +3,6 @@ use std::error::Error; use etcd_client::{AuthClient, ConnectOptions, GetOptions, Permission, PermissionType}; use test_macros::abort_on_panic; use xline::client::kv_types::{PutRequest, RangeRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/kv_test.rs b/xline/tests/kv_test.rs index c2c41e83e..063119c4a 100644 --- a/xline/tests/kv_test.rs +++ b/xline/tests/kv_test.rs @@ -5,7 +5,6 @@ use test_macros::abort_on_panic; use xline::client::kv_types::{ DeleteRangeRequest, PutRequest, RangeRequest, SortOrder, SortTarget, }; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/lease_test.rs b/xline/tests/lease_test.rs index 2dd0bcf9f..f0ddbb969 100644 --- a/xline/tests/lease_test.rs +++ b/xline/tests/lease_test.rs @@ -3,7 +3,6 @@ use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; use tracing::info; use xline::client::kv_types::{LeaseGrantRequest, PutRequest, RangeRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/watch_test.rs b/xline/tests/watch_test.rs index 9267c674a..0524cfd89 100644 --- a/xline/tests/watch_test.rs +++ b/xline/tests/watch_test.rs @@ -3,7 +3,6 @@ use std::error::Error; use etcd_client::EventType; use test_macros::abort_on_panic; use xline::client::kv_types::{DeleteRangeRequest, PutRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 421717b7f..8709a3553 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -181,7 +181,6 @@ mod leasepb { use serde::{Deserialize, Serialize}; -pub use self::etcdserverpb::range_request::{SortOrder, SortTarget}; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, etcdserverpb::{ @@ -193,6 +192,7 @@ pub use self::{ lease_server::{Lease, LeaseServer}, maintenance_client::MaintenanceClient, maintenance_server::{Maintenance, MaintenanceServer}, + range_request::{SortOrder, SortTarget}, request_op::Request, response_op::Response, watch_client::WatchClient, From 897eb96f39231a62d8b79af60a05d26cc20d3891 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Wed, 26 Jul 2023 15:08:16 +0800 Subject: [PATCH 2/9] feat(compactor): implement the revision compactor Refs: 188 Signed-off-by: Phoeniix Zhao --- xline/src/server/xline_server.rs | 15 ++- xline/src/state.rs | 18 ++- xline/src/storage/compact/mod.rs | 70 ++++++++++- .../src/storage/compact/revision_compactor.rs | 109 ++++++++++++++++++ xline/src/storage/kv_store.rs | 4 +- xline/src/storage/mod.rs | 4 +- 6 files changed, 209 insertions(+), 11 deletions(-) create mode 100644 xline/src/storage/compact/revision_compactor.rs diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 8fc76ef60..c4fcc91a5 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -33,7 +33,7 @@ use crate::{ }, state::State, storage::{ - compact::{compactor, COMPACT_CHANNEL_SIZE}, + compact::{compact_bg_task, COMPACT_CHANNEL_SIZE, auto_compactor}, index::Index, kvwatcher::KvWatcher, lease_store::LeaseCollection, @@ -139,7 +139,7 @@ impl XlineServer { compact_task_tx, Arc::clone(&lease_collection), )); - let _hd = tokio::spawn(compactor( + let _hd = tokio::spawn(compact_bg_task( Arc::clone(&kv_storage), Arc::clone(&index), *self.compact_cfg.compact_batch_size(), @@ -301,7 +301,6 @@ impl XlineServer { let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); - let state = State::new(Arc::clone(&lease_storage)); let ce = CommandExecutor::new( Arc::clone(&kv_storage), Arc::clone(&auth_storage), @@ -328,6 +327,16 @@ impl XlineServer { .await, ); + let auto_compactor = auto_compactor( + self.is_leader, + Arc::clone(&client), + header_gen.revision_arc(), + Arc::clone(&self.shutdown_trigger), + 1000, + ).await; + + + let state = State::new(Arc::clone(&lease_storage), Some(auto_compactor)); let curp_server = CurpServer::new( Arc::clone(&self.cluster_info), self.is_leader, diff --git a/xline/src/state.rs b/xline/src/state.rs index fa5c3442a..b8950e6c7 100644 --- a/xline/src/state.rs +++ b/xline/src/state.rs @@ -2,28 +2,40 @@ use std::{sync::Arc, time::Duration}; use curp::role_change::RoleChange; -use crate::storage::{storage_api::StorageApi, LeaseStore}; +use crate::storage::{ + storage_api::StorageApi, + LeaseStore, + compact::Compactor, +}; /// State of current node #[derive(Debug)] pub(crate) struct State { /// lease storage lease_storage: Arc>, + /// auto compactor + auto_compactor: Option>, } impl RoleChange for State { fn on_election_win(&self) { self.lease_storage.promote(Duration::from_secs(1)); // TODO: extend should be election timeout + if let Some(auto_compactor) = self.auto_compactor.as_ref() { + auto_compactor.resume(); + } } fn on_calibrate(&self) { self.lease_storage.demote(); + if let Some(auto_compactor) = self.auto_compactor.as_ref() { + auto_compactor.pause(); + } } } impl State { /// Create a new State - pub(super) fn new(lease_storage: Arc>) -> Self { - Self { lease_storage } + pub(super) fn new(lease_storage: Arc>, auto_compactor: Option>) -> Self { + Self { lease_storage, auto_compactor } } } diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 4c335ad9e..d2c4559de 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,19 +1,87 @@ use std::{sync::Arc, time::Duration}; +use async_trait::async_trait; +use curp::{client::Client, cmd::ProposeId, error::ProposeError}; use event_listener::Event; +use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; +use uuid::Uuid; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, KvStore, }; +use crate::{ + revision_number::RevisionNumberGenerator, + rpc::{CompactionRequest, RequestWithToken}, + server::command::Command, +}; + +/// mod revision compactor; +mod revision_compactor; /// compact task channel size pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32; +/// Compactor trait definition +#[async_trait] +pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { + /// run an auto-compactor + async fn run(&self); + /// pause an auto-compactor when the current node denotes to a non-leader role + fn pause(&self); + /// resume an auto-compactor when the current becomes a leader + fn resume(&self); +} + +/// `Compactable` trait indicates a method that receives a given revision and proposes a compact proposal +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { + /// do compact + async fn compact(&self, revision: i64) -> Result<(), ProposeError>; +} + +#[async_trait] +impl Compactable for Client { + async fn compact(&self, revision: i64) -> Result<(), ProposeError> { + let request = CompactionRequest { + revision, + physical: false, + }; + let request_wrapper = RequestWithToken::new_with_token(request.into(), None); + let propose_id = ProposeId::new(format!("auto-compactor-{}", Uuid::new_v4())); + let cmd = Command::new(vec![], request_wrapper, propose_id); + let _cmd_res = self.propose(cmd).await?; + Ok(()) + } +} + +/// Boot up an auto-compactor background task. +pub(crate) async fn auto_compactor( + is_leader: bool, + client: Arc>, + revision_getter: Arc, + shutdown_trigger: Arc, + retention: i64, +) -> Arc { + let auto_compactor: Arc = RevisionCompactor::new_arc( + is_leader, + client, + revision_getter, + shutdown_trigger, + retention, + ); + let compactor_handle = Arc::clone(&auto_compactor); + let _hd = tokio::spawn( async move{ + auto_compactor.run().await; + }); + compactor_handle +} + /// background compact executor -pub(crate) async fn compactor( +pub(crate) async fn compact_bg_task( kv_store: Arc>, index: Arc, batch_limit: usize, diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs new file mode 100644 index 000000000..e79dc510d --- /dev/null +++ b/xline/src/storage/compact/revision_compactor.rs @@ -0,0 +1,109 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + time::{Duration, Instant}, +}; + +use clippy_utilities::OverflowArithmetic; +use event_listener::Event; +use tracing::{info, warn}; + +use super::{Compactable, Compactor}; +use crate::revision_number::RevisionNumberGenerator; + +/// check for the need of compaction every 5 minutes +const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); + +/// Revision auto compactor +#[derive(Debug)] +pub(crate) struct RevisionCompactor { + /// `is_leader` indicates whether the current node is a leader or not. + is_leader: AtomicBool, + /// curp client + client: Arc, + /// revision getter + revision_getter: Arc, + /// shutdown trigger + shutdown_trigger: Arc, + /// revision retention + retention: i64, +} + +impl RevisionCompactor{ + /// Creates a new revision compactor + pub(super) fn new_arc( + is_leader: bool, + client: Arc, + revision_getter: Arc, + shutdown_trigger: Arc, + retention: i64, + ) -> Arc { + Arc::new(Self { + is_leader: AtomicBool::new(is_leader), + client, + revision_getter, + shutdown_trigger, + retention, + }) + } +} + +#[async_trait::async_trait] +impl Compactor for RevisionCompactor { + fn pause(&self) { + self.is_leader.store(false, Relaxed); + } + + fn resume(&self) { + self.is_leader.store(true, Relaxed); + } + + #[allow(clippy::integer_arithmetic)] + async fn run(&self) { + let prev = 0; + let shutdown_trigger = self.shutdown_trigger.listen(); + let mut ticker = tokio::time::interval(CHECK_INTERVAL); + tokio::pin!(shutdown_trigger); + loop { + tokio::select! { + _ = ticker.tick() => { + if !self.is_leader.load(Relaxed) { + continue; + } + } + // To ensure that each iteration invokes the same `shutdown_trigger` and keeps + // events losing due to the cancellation of `shutdown_trigger` at bay. + _ = &mut shutdown_trigger => { + break; + } + } + + let target_revision = self.revision_getter.get().overflow_sub(self.retention); + if target_revision <= 0 || target_revision == prev { + continue; + } + + let now = Instant::now(); + info!( + "starting auto revision compaction, revision = {}, retention = {}", + target_revision, self.retention + ); + // TODO: add more error processing logic + if let Err(e) = self.client.compact(target_revision).await { + warn!( + "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", + target_revision, self.retention, e + ); + } else { + info!( + "completed auto revision compaction, revision = {}, retention = {}, took {:?}", + target_revision, + self.retention, + now.elapsed().as_secs() + ); + } + } + } +} diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index b4eeab6c8..434c07acb 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -869,7 +869,7 @@ mod test { revision_number::RevisionNumberGenerator, rpc::{Request as UniRequest, RequestOp}, storage::{ - compact::{compactor, COMPACT_CHANNEL_SIZE}, + compact::{compact_bg_task, COMPACT_CHANNEL_SIZE}, db::DB, kvwatcher::KvWatcher, }, @@ -929,7 +929,7 @@ mod test { shutdown_trigger, Duration::from_millis(10), ); - let _compactor = tokio::spawn(compactor( + let _compactor = tokio::spawn(compact_bg_task( Arc::clone(&storage), index, 1000, diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index c42d40e44..a3961d980 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -1,7 +1,7 @@ /// Storage for Auth pub(crate) mod auth_store; -/// Compactor -pub(crate) mod compact; +/// Compact module +pub(super) mod compact; /// Database module pub mod db; /// Execute error From 39334d00030d678e44db91d242ffff015e9ed732 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 27 Jul 2023 08:41:01 +0800 Subject: [PATCH 3/9] feat(compactor): implement auto periodic compactor Refs: 188 Signed-off-by: Phoeniix Zhao --- xline/src/storage/compact/mod.rs | 3 + .../src/storage/compact/periodic_compactor.rs | 218 ++++++++++++++++++ .../src/storage/compact/revision_compactor.rs | 4 +- 3 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 xline/src/storage/compact/periodic_compactor.rs diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index d2c4559de..34c5a7fc6 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -21,6 +21,9 @@ use crate::{ /// mod revision compactor; mod revision_compactor; +/// mod periodic compactor; +mod periodic_compactor; + /// compact task channel size pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32; diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs new file mode 100644 index 000000000..97ce1544a --- /dev/null +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -0,0 +1,218 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + time::{Duration, Instant}, + cmp::Ordering, +}; + +use event_listener::Event; +use tracing::{info, warn}; +use clippy_utilities::OverflowArithmetic; + +use super::{Compactable, Compactor}; +use crate::revision_number::RevisionNumberGenerator; + + +/// `RevisionWindow` is a ring buffer used to store periodically sampled revision. +struct RevisionWindow { + /// inner ring buffer + ring_buf: Vec, + /// head index of the ring buffer + cursor: usize, + /// sample total amount + retention: usize +} + +impl RevisionWindow { + /// Create a new `RevisionWindow` + fn new(retention: usize) -> Self { + Self { + ring_buf: Vec::with_capacity(retention), + cursor: retention.overflow_sub(1), + retention, + } + } + + /// Store the revision into the inner ring buffer + #[allow(clippy::integer_arithmetic)] + fn sample(&mut self, revision: i64) { + self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow + match self.ring_buf.len().cmp(&self.retention) { + Ordering::Less => {self.ring_buf.push(revision)}, + Ordering::Equal => { + if let Some(element) = self.ring_buf.get_mut(self.cursor){ + *element = revision; + } else { + unreachable!("ring_buf ({:?}) at {} should not be None", self.ring_buf, self.cursor); + } + }, + Ordering::Greater => {unreachable!("the length of RevisionWindow should be less than {}", self.retention)} + } + } + + /// Retrieve the expired revision that is sampled period ago + #[allow(clippy::indexing_slicing, clippy::integer_arithmetic)] + fn expired_revision(&self) -> Option { + debug_assert!(self.ring_buf.len() <= self.retention, "the length of RevisionWindow should be less than {}", self.retention); + if self.ring_buf.len() < self.retention { + None + } else { + let target = (self.cursor + 1) % self.retention; + Some(self.ring_buf[target]) // it's ok to do so since ring_buf[target] should not be None. + } + } +} + +/// Revision auto compactor +#[derive(Debug)] +pub(crate) struct PeriodicCompactor { + /// `is_leader` indicates whether the current node is a leader or not. + is_leader: AtomicBool, + /// curp client + client: Arc, + /// revision getter + revision_getter: Arc, + /// shutdown trigger + shutdown_trigger: Arc, + /// compaction period + period: Duration, +} + +impl PeriodicCompactor{ + #[allow(dead_code)] + /// Creates a new revision compactor + pub(super) fn new_arc( + is_leader: bool, + client: Arc, + revision_getter: Arc, + shutdown_trigger: Arc, + period: Duration, + ) -> Arc { + Arc::new(Self { + is_leader: AtomicBool::new(is_leader), + client, + revision_getter, + shutdown_trigger, + period, + }) + } +} + +/// Calculate the sample frequency and the total amount of samples. +fn sample_config(period: Duration) -> (Duration, usize) { + let one_hour = Duration::from_secs(60.overflow_mul(60)); + let base_interval = match period.cmp(&one_hour) { + Ordering::Less => period, + Ordering::Equal | Ordering::Greater => one_hour, + }; + let divisor = 10; + let check_interval = base_interval.checked_div(divisor).unwrap_or_else(|| {unreachable!("duration divisor should not be 0")}); + let check_interval_secs = check_interval.as_secs(); + let periodic_secs = period.as_secs(); + let length = periodic_secs.overflow_div(check_interval_secs).overflow_add(1); + let retention = usize::try_from(length).unwrap_or_else(|e| {panic!("auto compact period is too large: {e}")}); + (check_interval, retention) +} + +#[async_trait::async_trait] +impl Compactor for PeriodicCompactor { + fn pause(&self) { + self.is_leader.store(false, Relaxed); + } + + fn resume(&self) { + self.is_leader.store(true, Relaxed); + } + + #[allow(clippy::integer_arithmetic)] + async fn run(&self) { + let mut last_revision = None; + let shutdown_trigger = self.shutdown_trigger.listen(); + let (sample_frequency, sample_total) = sample_config(self.period); + let mut ticker = tokio::time::interval(sample_frequency); + let mut revision_window = RevisionWindow::new(sample_total); + tokio::pin!(shutdown_trigger); + loop { + revision_window.sample(self.revision_getter.get()); + tokio::select! { + _ = ticker.tick() => { + if !self.is_leader.load(Relaxed) { + continue; + } + } + // To ensure that each iteration invokes the same `shutdown_trigger` and keeps + // events losing due to the cancellation of `shutdown_trigger` at bay. + _ = &mut shutdown_trigger => { + break; + } + } + + let target_revision = revision_window.expired_revision(); + if target_revision != last_revision { + let revision = target_revision.unwrap_or_else(|| {unreachable!("target revision shouldn't be None")}); + let now = Instant::now(); + info!( + "starting auto periodic compaction, revision = {}, period = {:?}", + revision, self.period + ); + // TODO: add more error processing logic + if let Err(e) = self.client.compact(revision).await { + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", + revision, self.period, e + ); + } else { + info!( + "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", + revision, + self.period, + now.elapsed().as_secs() + ); + last_revision = Some(revision); + } + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn revision_window_should_work() { + let mut rw = RevisionWindow::new(3); + assert!(rw.expired_revision().is_none()); + rw.sample(0); + assert!(rw.expired_revision().is_none()); + rw.sample(1); + assert!(rw.expired_revision().is_none()); + rw.sample(2); + assert_eq!(rw.expired_revision(), Some(0)); + // retention is 2 + // The first 3 minutes: 0,1,2 + // The second 2 minutes: 3,4 + rw.sample(3); + rw.sample(4); + assert_eq!(rw.expired_revision(), Some(2)); + } + + #[test] + fn sample_config_should_success() { + // period is 59 minutes, less than one hour + let (interval, retention) = sample_config(Duration::from_secs(59 * 60)); + assert_eq!(interval, Duration::from_secs(354)); + assert_eq!(retention, 11); + + // period is 60 minutes, equal to one hour + let (interval, retention) = sample_config(Duration::from_secs(60 * 60)); + assert_eq!(interval, Duration::from_secs(6 * 60)); + assert_eq!(retention, 11); + + // period is 24 hours, lager than one hour + let (interval, retention) = sample_config(Duration::from_secs(24 * 60 * 60)); + assert_eq!(interval, Duration::from_secs(6 * 60)); + assert_eq!(retention, 241); + } +} diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index e79dc510d..f66b7b1de 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -62,7 +62,7 @@ impl Compactor for RevisionCompactor { #[allow(clippy::integer_arithmetic)] async fn run(&self) { - let prev = 0; + let last_revision = 0; let shutdown_trigger = self.shutdown_trigger.listen(); let mut ticker = tokio::time::interval(CHECK_INTERVAL); tokio::pin!(shutdown_trigger); @@ -81,7 +81,7 @@ impl Compactor for RevisionCompactor { } let target_revision = self.revision_getter.get().overflow_sub(self.retention); - if target_revision <= 0 || target_revision == prev { + if target_revision <= 0 || target_revision == last_revision { continue; } From 145264ebbd56dd5f5c18602ff4c13ef82e3fe6a9 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 27 Jul 2023 14:40:39 +0800 Subject: [PATCH 4/9] feat(compactor): make auto compactor configurable Refs: 188 Signed-off-by: Phoeniix Zhao --- Cargo.lock | 1 + utils/Cargo.toml | 1 + utils/src/config.rs | 86 ++++++++++++++++++- utils/src/lib.rs | 46 +++++++++- xline/src/main.rs | 36 +++++++- xline/src/server/xline_server.rs | 25 +++--- xline/src/state.rs | 16 ++-- xline/src/storage/compact/mod.rs | 32 ++++--- .../src/storage/compact/periodic_compactor.rs | 49 +++++++---- .../src/storage/compact/revision_compactor.rs | 2 +- 10 files changed, 244 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59e9b72a1..c42af8dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3010,6 +3010,7 @@ name = "utils" version = "0.1.0" dependencies = [ "async-trait", + "clippy-utilities 0.2.0", "derive_builder", "getset", "madsim-tokio", diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 3c49e1170..3ea216d72 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -36,6 +36,7 @@ toml = "0.5" thiserror = "1.0.31" tracing-appender = "0.2" derive_builder = "0.12.0" +clippy-utilities = "0.2.0" [dev-dependencies] opentelemetry-jaeger = "0.17.0" diff --git a/utils/src/config.rs b/utils/src/config.rs index 01bf30809..4c531214d 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -133,6 +133,9 @@ pub struct CompactConfig { #[getset(get = "pub")] #[serde(with = "duration_format", default = "default_compact_sleep_interval")] compact_sleep_interval: Duration, + /// The auto compactor config + #[getset(get = "pub")] + auto_compact_config: Option, } impl Default for CompactConfig { @@ -141,6 +144,7 @@ impl Default for CompactConfig { Self { compact_batch_size: default_compact_batch_size(), compact_sleep_interval: default_compact_sleep_interval(), + auto_compact_config: None, } } } @@ -149,10 +153,15 @@ impl CompactConfig { /// Create a new compact config #[must_use] #[inline] - pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self { + pub fn new( + compact_batch_size: usize, + compact_sleep_interval: Duration, + auto_compact_config: Option, + ) -> Self { Self { compact_batch_size, compact_sleep_interval, + auto_compact_config, } } } @@ -475,6 +484,23 @@ impl Default for ServerTimeout { } } +/// Auto Compactor Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde( + tag = "mode", + content = "retention", + rename_all(deserialize = "lowercase") +)] +pub enum AutoCompactConfig { + /// auto periodic compactor + #[serde(with = "duration_format")] + Periodic(Duration), + /// auto revision compactor + Revision(i64), +} + /// Storage Configuration #[allow(clippy::module_name_repetitions)] #[non_exhaustive] @@ -751,6 +777,10 @@ mod tests { compact_batch_size = 123 compact_sleep_interval = '5ms' + [compact.auto_compact_config] + mode = 'periodic' + retention = '10h' + [log] path = '/var/log/xline' rotation = 'daily' @@ -785,6 +815,7 @@ mod tests { Duration::from_millis(20), Duration::from_secs(1), ); + assert_eq!( config.cluster, ClusterConfig::new( @@ -825,7 +856,10 @@ mod tests { config.compact, CompactConfig { compact_batch_size: 123, - compact_sleep_interval: Duration::from_millis(5) + compact_sleep_interval: Duration::from_millis(5), + auto_compact_config: Some(AutoCompactConfig::Periodic(Duration::from_secs( + 10 * 60 * 60 + ))) } ); } @@ -907,4 +941,52 @@ mod tests { ); assert_eq!(config.compact, CompactConfig::default()); } + + #[allow(clippy::unwrap_used)] + #[test] + fn test_auto_revision_compactor_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + + [cluster.members] + node1 = '127.0.0.1:2379' + node2 = '127.0.0.1:2380' + node3 = '127.0.0.1:2381' + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = 'memory' + + [compact] + + [compact.auto_compact_config] + mode = 'revision' + retention = 10000 + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + # auth_public_key = './public_key'.pem' + # auth_private_key = './private_key.pem'"#, + ) + .unwrap(); + + assert_eq!( + config.compact, + CompactConfig { + auto_compact_config: Some(AutoCompactConfig::Revision(10000)), + ..Default::default() + } + ); + } } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 8493374d9..502b7c6a4 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -138,6 +138,7 @@ use std::{collections::HashMap, time::Duration}; +use clippy_utilities::OverflowArithmetic; use thiserror::Error; use crate::config::{ClusterRange, LevelConfig, RotationConfig}; @@ -156,6 +157,13 @@ pub mod tokio_lock; /// utils for pass span context pub mod tracing; +/// seconds per minute +const SECS_PER_MINUTE: u64 = 60; +/// seconds per hour +const SECS_PER_HOUR: u64 = 3600; +/// seconds per day, equals to 24 * 60 * 60 = 86400 +const SECS_PER_DAY: u64 = 86400; + /// Config Parse Error #[derive(Debug, Error)] #[non_exhaustive] @@ -236,9 +244,36 @@ pub fn parse_duration(s: &str) -> Result { "the value of time should not be empty ({s})" ))) } + } else if s.ends_with('m') { + if let Some(dur) = s.strip_suffix('m') { + let minutes: u64 = dur.parse()?; + Ok(Duration::from_secs(minutes.overflow_mul(SECS_PER_MINUTE))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } + } else if s.ends_with('h') { + if let Some(dur) = s.strip_suffix('h') { + let hours: u64 = dur.parse()?; + Ok(Duration::from_secs(hours.overflow_mul(SECS_PER_HOUR))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } + } else if s.ends_with('d') { + if let Some(dur) = s.strip_suffix('d') { + let days: u64 = dur.parse()?; + Ok(Duration::from_secs(days.overflow_mul(SECS_PER_DAY))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } } else { Err(ConfigParseError::InvalidUnit(format!( - "the unit of time should be one of 'us', 'ms' or 's'({s})" + "the unit of time should be one of 'us', 'ms', 's', 'm', 'h' or 'd' ({s})" ))) } } @@ -322,6 +357,15 @@ mod test { assert_eq!(parse_duration("5s").unwrap(), Duration::from_secs(5)); assert_eq!(parse_duration("3ms").unwrap(), Duration::from_millis(3)); assert_eq!(parse_duration("1us").unwrap(), Duration::from_micros(1)); + assert_eq!(parse_duration("3m").unwrap(), Duration::from_secs(180)); + assert_eq!( + parse_duration("2h").unwrap(), + Duration::from_secs(2 * SECS_PER_HOUR) + ); + assert_eq!( + parse_duration("30d").unwrap(), + Duration::from_secs(30 * SECS_PER_DAY) + ); let results = vec![ parse_duration("hello world"), parse_duration("5x"), diff --git a/xline/src/main.rs b/xline/src/main.rs index c3a259b1d..fbfb59076 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -157,8 +157,9 @@ use utils::{ default_propose_timeout, default_range_retry_timeout, default_retry_timeout, default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, default_sync_victims_interval, default_watch_progress_notify_interval, file_appender, - AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig, - LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig, + AuthConfig, AutoCompactConfig, ClientTimeout, ClusterConfig, CompactConfig, + CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout, StorageConfig, + TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation, }; @@ -269,6 +270,15 @@ struct ServerArgs { /// Interval between two compaction operations [default: 10ms] #[clap(long, value_parser = parse_duration)] compact_sleep_interval: Option, + /// Auto compact mode + #[clap(long)] + auto_compact_mode: Option, + /// Auto periodic compact retention + #[clap(long, value_parser = parse_duration)] + auto_periodic_retention: Option, + /// Auto revision compact retention + #[clap(long)] + auto_revision_retention: Option, } impl From for XlineServerConfig { @@ -335,10 +345,32 @@ impl From for XlineServerConfig { args.jaeger_level, ); let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); + let auto_comapctor_cfg = if let Some(mode) = args.auto_compact_mode { + match mode.as_str() { + "periodic" => { + let period = args.auto_periodic_retention.unwrap_or_else(|| { + panic!("missing auto_periodic_retention argument"); + }); + Some(AutoCompactConfig::Periodic(period)) + } + "revision" => { + let retention = args.auto_revision_retention.unwrap_or_else(|| { + panic!("missing auto_revision_retention argument"); + }); + Some(AutoCompactConfig::Revision(retention)) + } + &_ => unreachable!( + "xline only supports two auto-compaction modes: periodic, revision" + ), + } + } else { + None + }; let compact = CompactConfig::new( args.compact_batch_size, args.compact_sleep_interval .unwrap_or_else(default_compact_sleep_interval), + auto_comapctor_cfg, ); XlineServerConfig::new(cluster, storage, log, trace, auth, compact) } diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index c4fcc91a5..601011ca4 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -33,7 +33,7 @@ use crate::{ }, state::State, storage::{ - compact::{compact_bg_task, COMPACT_CHANNEL_SIZE, auto_compactor}, + compact::{auto_compactor, compact_bg_task, COMPACT_CHANNEL_SIZE}, index::Index, kvwatcher::KvWatcher, lease_store::LeaseCollection, @@ -327,16 +327,21 @@ impl XlineServer { .await, ); - let auto_compactor = auto_compactor( - self.is_leader, - Arc::clone(&client), - header_gen.revision_arc(), - Arc::clone(&self.shutdown_trigger), - 1000, - ).await; - + let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config() + { + auto_compactor( + self.is_leader, + Arc::clone(&client), + header_gen.revision_arc(), + Arc::clone(&self.shutdown_trigger), + auto_config_cfg, + ) + .await + } else { + None + }; - let state = State::new(Arc::clone(&lease_storage), Some(auto_compactor)); + let state = State::new(Arc::clone(&lease_storage), auto_compactor); let curp_server = CurpServer::new( Arc::clone(&self.cluster_info), self.is_leader, diff --git a/xline/src/state.rs b/xline/src/state.rs index b8950e6c7..44ebfa3f2 100644 --- a/xline/src/state.rs +++ b/xline/src/state.rs @@ -2,11 +2,7 @@ use std::{sync::Arc, time::Duration}; use curp::role_change::RoleChange; -use crate::storage::{ - storage_api::StorageApi, - LeaseStore, - compact::Compactor, -}; +use crate::storage::{compact::Compactor, storage_api::StorageApi, LeaseStore}; /// State of current node #[derive(Debug)] @@ -35,7 +31,13 @@ impl RoleChange for State { impl State { /// Create a new State - pub(super) fn new(lease_storage: Arc>, auto_compactor: Option>) -> Self { - Self { lease_storage, auto_compactor } + pub(super) fn new( + lease_storage: Arc>, + auto_compactor: Option>, + ) -> Self { + Self { + lease_storage, + auto_compactor, + } } } diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 34c5a7fc6..e738a69fe 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -3,8 +3,10 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use curp::{client::Client, cmd::ProposeId, error::ProposeError}; use event_listener::Event; +use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; +use utils::config::AutoCompactConfig; use uuid::Uuid; use super::{ @@ -67,20 +69,28 @@ pub(crate) async fn auto_compactor( client: Arc>, revision_getter: Arc, shutdown_trigger: Arc, - retention: i64, -) -> Arc { - let auto_compactor: Arc = RevisionCompactor::new_arc( - is_leader, - client, - revision_getter, - shutdown_trigger, - retention, - ); + auto_compact_cfg: AutoCompactConfig, +) -> Option> { + let auto_compactor: Arc = match auto_compact_cfg { + AutoCompactConfig::Periodic(period) => { + PeriodicCompactor::new_arc(is_leader, client, revision_getter, shutdown_trigger, period) + } + AutoCompactConfig::Revision(retention) => RevisionCompactor::new_arc( + is_leader, + client, + revision_getter, + shutdown_trigger, + retention, + ), + _ => { + unreachable!("xline only supports two auto-compaction modes: periodic, revision") + } + }; let compactor_handle = Arc::clone(&auto_compactor); - let _hd = tokio::spawn( async move{ + let _hd = tokio::spawn(async move { auto_compactor.run().await; }); - compactor_handle + Some(compactor_handle) } /// background compact executor diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index 97ce1544a..a9223c6f3 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -1,20 +1,19 @@ use std::{ + cmp::Ordering, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, }, time::{Duration, Instant}, - cmp::Ordering, }; +use clippy_utilities::OverflowArithmetic; use event_listener::Event; use tracing::{info, warn}; -use clippy_utilities::OverflowArithmetic; use super::{Compactable, Compactor}; use crate::revision_number::RevisionNumberGenerator; - /// `RevisionWindow` is a ring buffer used to store periodically sampled revision. struct RevisionWindow { /// inner ring buffer @@ -22,7 +21,7 @@ struct RevisionWindow { /// head index of the ring buffer cursor: usize, /// sample total amount - retention: usize + retention: usize, } impl RevisionWindow { @@ -38,24 +37,36 @@ impl RevisionWindow { /// Store the revision into the inner ring buffer #[allow(clippy::integer_arithmetic)] fn sample(&mut self, revision: i64) { - self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow + self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow match self.ring_buf.len().cmp(&self.retention) { - Ordering::Less => {self.ring_buf.push(revision)}, + Ordering::Less => self.ring_buf.push(revision), Ordering::Equal => { - if let Some(element) = self.ring_buf.get_mut(self.cursor){ + if let Some(element) = self.ring_buf.get_mut(self.cursor) { *element = revision; } else { - unreachable!("ring_buf ({:?}) at {} should not be None", self.ring_buf, self.cursor); + unreachable!( + "ring_buf ({:?}) at {} should not be None", + self.ring_buf, self.cursor + ); } - }, - Ordering::Greater => {unreachable!("the length of RevisionWindow should be less than {}", self.retention)} + } + Ordering::Greater => { + unreachable!( + "the length of RevisionWindow should be less than {}", + self.retention + ) + } } } /// Retrieve the expired revision that is sampled period ago #[allow(clippy::indexing_slicing, clippy::integer_arithmetic)] fn expired_revision(&self) -> Option { - debug_assert!(self.ring_buf.len() <= self.retention, "the length of RevisionWindow should be less than {}", self.retention); + debug_assert!( + self.ring_buf.len() <= self.retention, + "the length of RevisionWindow should be less than {}", + self.retention + ); if self.ring_buf.len() < self.retention { None } else { @@ -80,7 +91,7 @@ pub(crate) struct PeriodicCompactor { period: Duration, } -impl PeriodicCompactor{ +impl PeriodicCompactor { #[allow(dead_code)] /// Creates a new revision compactor pub(super) fn new_arc( @@ -108,11 +119,16 @@ fn sample_config(period: Duration) -> (Duration, usize) { Ordering::Equal | Ordering::Greater => one_hour, }; let divisor = 10; - let check_interval = base_interval.checked_div(divisor).unwrap_or_else(|| {unreachable!("duration divisor should not be 0")}); + let check_interval = base_interval + .checked_div(divisor) + .unwrap_or_else(|| unreachable!("duration divisor should not be 0")); let check_interval_secs = check_interval.as_secs(); let periodic_secs = period.as_secs(); - let length = periodic_secs.overflow_div(check_interval_secs).overflow_add(1); - let retention = usize::try_from(length).unwrap_or_else(|e| {panic!("auto compact period is too large: {e}")}); + let length = periodic_secs + .overflow_div(check_interval_secs) + .overflow_add(1); + let retention = + usize::try_from(length).unwrap_or_else(|e| panic!("auto compact period is too large: {e}")); (check_interval, retention) } @@ -151,7 +167,8 @@ impl Compactor for PeriodicCompactor { let target_revision = revision_window.expired_revision(); if target_revision != last_revision { - let revision = target_revision.unwrap_or_else(|| {unreachable!("target revision shouldn't be None")}); + let revision = target_revision + .unwrap_or_else(|| unreachable!("target revision shouldn't be None")); let now = Instant::now(); info!( "starting auto periodic compaction, revision = {}, period = {:?}", diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index f66b7b1de..e2c27c187 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -31,7 +31,7 @@ pub(crate) struct RevisionCompactor { retention: i64, } -impl RevisionCompactor{ +impl RevisionCompactor { /// Creates a new revision compactor pub(super) fn new_arc( is_leader: bool, From 856dcfeade7ee7b73179e2c7e7ba844f0431c18f Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 27 Jul 2023 20:58:40 +0800 Subject: [PATCH 5/9] refactor: improve the compatibility with etcdctl Signed-off-by: Phoeniix Zhao --- xline/src/server/kv_server.rs | 2 +- xline/src/server/watch_server.rs | 112 ++++++++++++++++--------------- xline/src/storage/kvwatcher.rs | 53 +++++++++++---- 3 files changed, 98 insertions(+), 69 deletions(-) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 9ca15bea0..38f6e6702 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -191,7 +191,7 @@ where ) -> Result<(), tonic::Status> { (range_revision <= 0 || range_revision >= compacted_revision) .then_some(()) - .ok_or(tonic::Status::invalid_argument(format!( + .ok_or(tonic::Status::out_of_range(format!( "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" ))) } diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index 383a7b6eb..a27aba494 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -198,17 +198,6 @@ where /// Handle `WatchCreateRequest` async fn handle_watch_create(&mut self, req: WatchCreateRequest) { - let compacted_revision = self.kv_watcher.compacted_revision(); - if req.start_revision < compacted_revision { - let result = Err(tonic::Status::invalid_argument(format!( - "required revision {} has been compacted, compacted revision is {}", - req.start_revision, compacted_revision - ))); - if self.response_tx.send(result).await.is_err() { - self.stop_notify.notify(1); - } - return; - } let Some(watch_id) = self.validate_watch_id(req.watch_id) else { let result = Err(tonic::Status::already_exists(format!( "Watch ID {} has already been used", @@ -250,7 +239,6 @@ where header: Some(self.header_gen.gen_header()), watch_id, created: true, - compact_revision: compacted_revision, ..WatchResponse::default() }; if self.response_tx.send(Ok(response)).await.is_err() { @@ -301,31 +289,46 @@ where /// Handle watch event async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) { - let mut events = watch_event.take_events(); - if events.is_empty() { - return; - } let watch_id = watch_event.watch_id(); - if self.prev_kv.contains(&watch_id) { - for ev in &mut events { - if !ev.is_create() { - let kv = ev - .kv - .as_ref() - .unwrap_or_else(|| panic!("event.kv can't be None")); - ev.prev_kv = self.kv_watcher.get_prev_kv(kv); + let response = if watch_event.compacted() { + WatchResponse { + header: Some(ResponseHeader { + revision: watch_event.revision(), + ..ResponseHeader::default() + }), + watch_id, + compact_revision: self.kv_watcher.compacted_revision(), + canceled: true, + ..WatchResponse::default() + } + } else { + let mut events = watch_event.take_events(); + if events.is_empty() { + return; + } + + if self.prev_kv.contains(&watch_id) { + for ev in &mut events { + if !ev.is_create() { + let kv = ev + .kv + .as_ref() + .unwrap_or_else(|| panic!("event.kv can't be None")); + ev.prev_kv = self.kv_watcher.get_prev_kv(kv); + } } } - } - let response = WatchResponse { - header: Some(ResponseHeader { - revision: watch_event.revision(), - ..ResponseHeader::default() - }), - watch_id, - events, - ..WatchResponse::default() + WatchResponse { + header: Some(ResponseHeader { + revision: watch_event.revision(), + ..ResponseHeader::default() + }), + watch_id, + events, + ..WatchResponse::default() + } }; + if self.response_tx.send(Ok(response)).await.is_err() { self.stop_notify.notify(1); } @@ -791,8 +794,7 @@ mod test { })), }; req_tx.send(Ok(create_watch_req(1, 2))).await.unwrap(); - req_tx.send(Ok(create_watch_req(2, 3))).await.unwrap(); - req_tx.send(Ok(create_watch_req(3, 4))).await.unwrap(); + // req_tx.send(Ok(create_watch_req(3, 4))).await.unwrap(); let (res_tx, mut res_rx) = mpsc::channel(CHANNEL_SIZE); let _hd = tokio::spawn(WatchServer::::task( Arc::clone(&next_id_gen), @@ -803,24 +805,26 @@ mod test { default_watch_progress_notify_interval(), )); - for i in 0..3 { - let watch_res = res_rx.recv().await.unwrap(); - if i == 0 { - if let Err(e) = watch_res { - assert_eq!(e.code(), tonic::Code::InvalidArgument, - "watch a compacted revision should return invalid_argument error, but found {e:?}" - ); - } else { - unreachable!( - "watch create request with a compacted revision should not be successful" - ) - } - } else { - assert!( - watch_res.is_ok(), - "watch create request with a valid revision should be successful" - ); - } - } + // It's allowed to create a compacted watch request, but it will immediately cancel. Doing so is for the compatibility with etcdctl + let watch_create_success_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_create_success_res.created); + assert_eq!(watch_create_success_res.watch_id, 1); + let watch_cancel_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_cancel_res.canceled); + assert_eq!(watch_cancel_res.compact_revision, 3); + + req_tx.send(Ok(create_watch_req(2, 3))).await.unwrap(); + let watch_create_success_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_create_success_res.created); + assert_eq!(watch_create_success_res.compact_revision, 0); + assert_eq!(watch_create_success_res.watch_id, 2); + + let watch_event_res = res_rx.recv().await.unwrap().unwrap(); + assert!(!watch_event_res.created); + assert!(!watch_event_res.canceled); + assert_eq!(watch_event_res.compact_revision, 0); + assert_eq!(watch_event_res.watch_id, 2); + + } } diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index 0f373c9db..a6e38b408 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -60,6 +60,8 @@ struct Watcher { stop_notify: Arc, /// Sender of watch event event_tx: mpsc::Sender, + /// Compacted flag + compacted: bool, } impl PartialEq for Watcher { @@ -85,6 +87,7 @@ impl Watcher { filters: Vec, stop_notify: Arc, event_tx: mpsc::Sender, + compacted: bool, ) -> Self { Self { key_range, @@ -93,6 +96,7 @@ impl Watcher { filters, stop_notify, event_tx, + compacted, } } @@ -123,24 +127,20 @@ impl Watcher { &mut self, (revision, events): (i64, Vec), ) -> Result<(), TrySendError> { - if revision < self.start_rev { - return Ok(()); - } - let events = self.filter_events(events); - if events.is_empty() { - return Ok(()); - } let watch_id = self.watch_id(); - debug!( - watch_id, - revision, - events_len = events.len(), - "try to send watch response" - ); + let events = self.filter_events(events); + let events_len = events.len(); let watch_event = WatchEvent { id: watch_id, events, revision, + compacted: self.compacted, + }; + if !self.compacted { + if revision < self.start_rev || 0 == events_len { + return Ok(()); + } + debug!(watch_id, revision, events_len, "try to send watch response"); }; match self.event_tx.try_send(watch_event) { @@ -238,6 +238,7 @@ impl WatcherMap { id: watch_id, revision: updates.0, events: updates.1, + compacted: false, }; assert!( self.victims @@ -317,6 +318,7 @@ where stop_notify: Arc, event_tx: mpsc::Sender, ) { + let compacted = start_rev < self.compacted_revision(); let mut watcher = Watcher::new( key_range.clone(), id, @@ -324,8 +326,22 @@ where filters, stop_notify, event_tx, + compacted, ); let mut watcher_map_w = self.watcher_map.write(); + if compacted { + debug!("The revision {watcher:?} required has been compacted"); + if let Err(TrySendError::Full(watch_event)) = watcher.notify((0, vec![])) { + assert!( + watcher_map_w + .victims + .insert(watcher, (watch_event.revision, watch_event.events)) + .is_none(), + "can't insert a watcher to victims twice" + ); + }; + return; + } let initial_events = if start_rev == 0 { vec![] @@ -446,7 +462,9 @@ where watche_id = watcher.watch_id(), "watcher synced by sync_victims_task" ); - watcher_map_w.register(watcher); + if !watcher.compacted { + watcher_map_w.register(watcher); + } } } if !new_victims.is_empty() { @@ -507,6 +525,8 @@ pub(crate) struct WatchEvent { events: Vec, /// Revision when this event is generated revision: i64, + /// Compacted WatchEvent + compacted: bool, } impl WatchEvent { @@ -524,6 +544,11 @@ impl WatchEvent { pub(crate) fn take_events(&mut self) -> Vec { std::mem::take(&mut self.events) } + + /// Check whether the `WatchEvent` is a compacted `WatchEvent` or not. + pub(crate) fn compacted(&self) -> bool { + self.compacted + } } /// Get the last revision of a event slice From 2e5fcf7ccb9ee906fe79c28356444b314138ec5f Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Thu, 27 Jul 2023 23:46:22 +0800 Subject: [PATCH 6/9] test: add validation test for compaction feature Signed-off-by: Phoeniix Zhao --- scripts/validation_test.sh | 34 ++++++++++++++++++++++++++++++++ xline/src/server/watch_server.rs | 2 -- xline/src/storage/kvwatcher.rs | 2 +- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/scripts/validation_test.sh b/scripts/validation_test.sh index 90f367f8c..ca3846835 100755 --- a/scripts/validation_test.sh +++ b/scripts/validation_test.sh @@ -25,6 +25,25 @@ run_with_expect() { fi } +# run a command with expect output, based on key word match +# args: +# $1: command to run +# $2: expect output +run_with_match() { + cmd="${1}" + res=$(eval ${cmd} 2>&1) + expect=$(echo -e ${2}) + if echo "${res}" | grep -q "${expect}"; then + echo "command run success" + else + echo "command run failed" + echo "command: ${cmd}" + echo "expect: ${expect}" + echo "result: ${res}" + exit 1 + fi +} + # validate kv requests kv_validation() { echo "kv validation test running..." @@ -189,6 +208,21 @@ maintenance_validation() { echo "maintenance validation test passed" } +# validate compact requests +compact_validation() { + echo "compact validation test running..." + for value in "value1" "value2" "value3" "value4" "value5" "value6"; + do + run_with_expect "${ETCDCTL} put key ${value}" "OK" + done + run_with_expect "${ETCDCTL} get --rev=4 key" "key\nvalue3" + run_with_expect "${ETCDCTL} compact 5" "compacted revision 5" + run_with_match "${ETCDCTL} get --rev=4 key" "required revision 4 has been compacted, compacted revision is 5" + run_with_match "${ETCDCTL} watch --rev=4 key " "watch was canceled (etcdserver: mvcc: required revision has been compacted)" + echo "compact validation test pass..." +} + +compact_validation kv_validation watch_validation lease_validation diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index a27aba494..b3345ed32 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -824,7 +824,5 @@ mod test { assert!(!watch_event_res.canceled); assert_eq!(watch_event_res.compact_revision, 0); assert_eq!(watch_event_res.watch_id, 2); - - } } diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index a6e38b408..964553803 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -318,7 +318,7 @@ where stop_notify: Arc, event_tx: mpsc::Sender, ) { - let compacted = start_rev < self.compacted_revision(); + let compacted = start_rev != 0 && start_rev < self.compacted_revision(); let mut watcher = Watcher::new( key_range.clone(), id, From 21983e9669ce45153e582c01e10693c13406cb96 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Sat, 29 Jul 2023 12:44:06 +0800 Subject: [PATCH 7/9] test: refactor auto compactor implementation and add unit tests to cover Refs: 188 Signed-off-by: Phoeniix Zhao --- .../src/storage/compact/periodic_compactor.rs | 132 ++++++++++++++---- .../src/storage/compact/revision_compactor.rs | 97 +++++++++---- 2 files changed, 171 insertions(+), 58 deletions(-) diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index a9223c6f3..b3aba39e6 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -92,11 +92,10 @@ pub(crate) struct PeriodicCompactor { } impl PeriodicCompactor { - #[allow(dead_code)] /// Creates a new revision compactor pub(super) fn new_arc( is_leader: bool, - client: Arc, + client: Arc, revision_getter: Arc, shutdown_trigger: Arc, period: Duration, @@ -109,6 +108,44 @@ impl PeriodicCompactor { period, }) } + + /// perform auto compaction logic + async fn do_compact( + &self, + last_revision: Option, + revision_window: &RevisionWindow, + ) -> Option { + if !self.is_leader.load(Relaxed) { + return None; + } + let target_revision = revision_window.expired_revision(); + if target_revision == last_revision { + return None; + } + let revision = + target_revision.unwrap_or_else(|| unreachable!("target revision shouldn't be None")); + let now = Instant::now(); + info!( + "starting auto periodic compaction, revision = {}, period = {:?}", + revision, self.period + ); + // TODO: add more error processing logic + if let Err(e) = self.client.compact(revision).await { + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", + revision, self.period, e + ); + None + } else { + info!( + "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", + revision, + self.period, + now.elapsed().as_secs() + ); + target_revision + } + } } /// Calculate the sample frequency and the total amount of samples. @@ -144,7 +181,7 @@ impl Compactor for PeriodicCompactor { #[allow(clippy::integer_arithmetic)] async fn run(&self) { - let mut last_revision = None; + let mut last_revision: Option = None; let shutdown_trigger = self.shutdown_trigger.listen(); let (sample_frequency, sample_total) = sample_config(self.period); let mut ticker = tokio::time::interval(sample_frequency); @@ -154,8 +191,8 @@ impl Compactor for PeriodicCompactor { revision_window.sample(self.revision_getter.get()); tokio::select! { _ = ticker.tick() => { - if !self.is_leader.load(Relaxed) { - continue; + if let Some(last_compacted_rev) = self.do_compact(last_revision, &revision_window).await { + last_revision = Some(last_compacted_rev); } } // To ensure that each iteration invokes the same `shutdown_trigger` and keeps @@ -164,32 +201,6 @@ impl Compactor for PeriodicCompactor { break; } } - - let target_revision = revision_window.expired_revision(); - if target_revision != last_revision { - let revision = target_revision - .unwrap_or_else(|| unreachable!("target revision shouldn't be None")); - let now = Instant::now(); - info!( - "starting auto periodic compaction, revision = {}, period = {:?}", - revision, self.period - ); - // TODO: add more error processing logic - if let Err(e) = self.client.compact(revision).await { - warn!( - "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", - revision, self.period, e - ); - } else { - info!( - "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", - revision, - self.period, - now.elapsed().as_secs() - ); - last_revision = Some(revision); - } - } } } } @@ -197,6 +208,8 @@ impl Compactor for PeriodicCompactor { #[cfg(test)] mod test { use super::*; + use crate::storage::compact::MockCompactable; + #[test] fn revision_window_should_work() { let mut rw = RevisionWindow::new(3); @@ -232,4 +245,61 @@ mod test { assert_eq!(interval, Duration::from_secs(6 * 60)); assert_eq!(retention, 241); } + + #[tokio::test] + async fn periodic_compactor_should_work_in_normal_path() { + let mut revision_window = RevisionWindow::new(11); + // revision_window: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + for revision in 1..=11 { + revision_window.sample(revision); + } + let mut compactable = MockCompactable::new(); + compactable.expect_compact().times(3).returning(|_| Ok(())); + let shutdown_trigger = Arc::new(Event::new()); + let revision_gen = Arc::new(RevisionNumberGenerator::new(1)); + let periodic_compactor = PeriodicCompactor::new_arc( + true, + Arc::new(compactable), + revision_gen, + shutdown_trigger, + Duration::from_secs(10), + ); + // auto_compactor works successfully + assert_eq!( + periodic_compactor.do_compact(None, &revision_window).await, + Some(1) + ); + revision_window.sample(12); + assert_eq!( + periodic_compactor + .do_compact(Some(1), &revision_window) + .await, + Some(2) + ); + periodic_compactor.pause(); + revision_window.sample(13); + assert!(periodic_compactor + .do_compact(Some(2), &revision_window) + .await + .is_none()); + revision_window.sample(14); + assert!(periodic_compactor + .do_compact(Some(2), &revision_window) + .await + .is_none()); + periodic_compactor.resume(); + // revision_window: [12, 13, 14, 4, 5, 6, 7, 8, 9, 10, 11] + assert_eq!( + periodic_compactor + .do_compact(Some(3), &revision_window) + .await, + Some(4) + ); + + // auto compactor should skip those revisions which have been auto compacted. + assert!(periodic_compactor + .do_compact(Some(4), &revision_window) + .await + .is_none()); + } } diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index e2c27c187..9b3327091 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -48,6 +48,40 @@ impl RevisionCompactor { retention, }) } + + /// perform auto compaction logic + async fn do_compact(&self, last_revision: Option) -> Option { + if !self.is_leader.load(Relaxed) { + return None; + } + + let target_revision = self.revision_getter.get().overflow_sub(self.retention); + if target_revision <= 0 || Some(target_revision) <= last_revision { + return None; + } + + let now = Instant::now(); + info!( + "starting auto revision compaction, revision = {}, retention = {}", + target_revision, self.retention + ); + // TODO: add more error processing logic + if let Err(e) = self.client.compact(target_revision).await { + warn!( + "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", + target_revision, self.retention, e + ); + None + } else { + info!( + "completed auto revision compaction, revision = {}, retention = {}, took {:?}", + target_revision, + self.retention, + now.elapsed().as_secs() + ); + Some(target_revision) + } + } } #[async_trait::async_trait] @@ -62,15 +96,15 @@ impl Compactor for RevisionCompactor { #[allow(clippy::integer_arithmetic)] async fn run(&self) { - let last_revision = 0; + let mut last_revision = None; let shutdown_trigger = self.shutdown_trigger.listen(); let mut ticker = tokio::time::interval(CHECK_INTERVAL); tokio::pin!(shutdown_trigger); loop { tokio::select! { _ = ticker.tick() => { - if !self.is_leader.load(Relaxed) { - continue; + if let Some(last_compacted_rev) = self.do_compact(last_revision).await { + last_revision = Some(last_compacted_rev); } } // To ensure that each iteration invokes the same `shutdown_trigger` and keeps @@ -79,31 +113,40 @@ impl Compactor for RevisionCompactor { break; } } + } + } +} - let target_revision = self.revision_getter.get().overflow_sub(self.retention); - if target_revision <= 0 || target_revision == last_revision { - continue; - } +#[cfg(test)] +mod test { + use super::*; + use crate::storage::compact::MockCompactable; - let now = Instant::now(); - info!( - "starting auto revision compaction, revision = {}, retention = {}", - target_revision, self.retention - ); - // TODO: add more error processing logic - if let Err(e) = self.client.compact(target_revision).await { - warn!( - "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", - target_revision, self.retention, e - ); - } else { - info!( - "completed auto revision compaction, revision = {}, retention = {}, took {:?}", - target_revision, - self.retention, - now.elapsed().as_secs() - ); - } - } + #[tokio::test] + async fn revision_compactor_should_work_in_normal_path() { + let mut compactable = MockCompactable::new(); + compactable.expect_compact().times(3).returning(|_| Ok(())); + let shutdown_trigger = Arc::new(Event::new()); + let revision_gen = Arc::new(RevisionNumberGenerator::new(110)); + let revision_compactor = RevisionCompactor::new_arc( + true, + Arc::new(compactable), + Arc::clone(&revision_gen), + shutdown_trigger, + 100, + ); + // auto_compactor works successfully + assert_eq!(revision_compactor.do_compact(None).await, Some(10)); + revision_gen.next(); // current revision: 111 + assert_eq!(revision_compactor.do_compact(Some(10)).await, Some(11)); + revision_compactor.pause(); + revision_gen.next(); // current revision 112 + assert!(revision_compactor.do_compact(Some(11)).await.is_none()); + revision_gen.next(); // current revision 113 + assert!(revision_compactor.do_compact(Some(11)).await.is_none()); + revision_compactor.resume(); + assert_eq!(revision_compactor.do_compact(Some(11)).await, Some(13)); + // auto compactor should skip those revisions which have been auto compacted. + assert!(revision_compactor.do_compact(Some(13)).await.is_none()); } } From a737614c74cdd557a21d5e15f135bedf964c84fa Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Sun, 30 Jul 2023 20:08:46 +0800 Subject: [PATCH 8/9] refactor: refactor auto compactor implementation * using static generic instead of dynamic trait object * improve revision window's implementation Signed-off-by: Phoeniix Zhao --- xline-client/src/types/auth.rs | 1 - xline-client/src/types/kv.rs | 3 +- xline-client/src/types/lease.rs | 2 +- xline-client/src/types/watch.rs | 3 +- xline-client/tests/watch.rs | 7 ++- xline/src/server/xline_server.rs | 16 ++--- xline/src/storage/compact/mod.rs | 4 +- .../src/storage/compact/periodic_compactor.rs | 62 ++++++------------- .../src/storage/compact/revision_compactor.rs | 10 +-- xlinectl/src/command/get.rs | 3 +- xlinectl/src/command/put.rs | 3 +- xlinectl/src/main.rs | 10 ++- 12 files changed, 52 insertions(+), 72 deletions(-) diff --git a/xline-client/src/types/auth.rs b/xline-client/src/types/auth.rs index c2b49b542..b0d7db90f 100644 --- a/xline-client/src/types/auth.rs +++ b/xline-client/src/types/auth.rs @@ -1,5 +1,4 @@ use xline::server::KeyRange; - pub use xlineapi::{ AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse, diff --git a/xline-client/src/types/kv.rs b/xline-client/src/types/kv.rs index 44ddd7000..bc16fa1ba 100644 --- a/xline-client/src/types/kv.rs +++ b/xline-client/src/types/kv.rs @@ -1,5 +1,4 @@ use xline::server::KeyRange; - pub use xlineapi::{ CompareResult, CompareTarget, DeleteRangeResponse, PutResponse, RangeResponse, Response, ResponseOp, SortOrder, SortTarget, TargetUnion, TxnResponse, @@ -714,7 +713,7 @@ impl From for xlineapi::TxnRequest { /// Compaction Request compacts the key-value store up to a given revision. /// All keys with revisions less than the given revision will be compacted. -/// The compaction process will remove all historical versions of these keys, except for the most recent one. +/// The compaction process will remove all historical versions of these keys, except for the most recent one. /// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)]. /// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)]. /// All revisions less than 3 are deleted. The latest revision, 3, will be kept. diff --git a/xline-client/src/types/lease.rs b/xline-client/src/types/lease.rs index 4729c7c83..01eed4fb7 100644 --- a/xline-client/src/types/lease.rs +++ b/xline-client/src/types/lease.rs @@ -1,9 +1,9 @@ use futures::channel::mpsc::Sender; - pub use xlineapi::{ LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse, LeaseStatus, LeaseTimeToLiveResponse, }; + use crate::error::{ClientError, Result}; /// The lease keep alive handle. diff --git a/xline-client/src/types/watch.rs b/xline-client/src/types/watch.rs index e91c960c6..b98d2b45f 100644 --- a/xline-client/src/types/watch.rs +++ b/xline-client/src/types/watch.rs @@ -2,12 +2,11 @@ use std::fmt::Debug; use futures::channel::mpsc::Sender; use xline::server::KeyRange; +pub use xlineapi::{Event, EventType, KeyValue, WatchResponse}; use xlineapi::{RequestUnion, WatchCancelRequest, WatchProgressRequest}; use crate::error::{ClientError, Result}; -pub use xlineapi::{Event, EventType, KeyValue, WatchResponse}; - /// The watching handle. #[derive(Debug)] pub struct Watcher { diff --git a/xline-client/tests/watch.rs b/xline-client/tests/watch.rs index 03efedec3..716d314e8 100644 --- a/xline-client/tests/watch.rs +++ b/xline-client/tests/watch.rs @@ -1,9 +1,10 @@ //! The following tests are originally from `etcd-client` -use crate::common::get_cluster_client; use xline_client::{ error::Result, - types::kv::PutRequest, - types::watch::{EventType, WatchRequest}, + types::{ + kv::PutRequest, + watch::{EventType, WatchRequest}, + }, }; use crate::common::get_cluster_client; diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 601011ca4..f2bc28988 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -329,14 +329,16 @@ impl XlineServer { let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config() { - auto_compactor( - self.is_leader, - Arc::clone(&client), - header_gen.revision_arc(), - Arc::clone(&self.shutdown_trigger), - auto_config_cfg, + Some( + auto_compactor( + self.is_leader, + Arc::clone(&client), + header_gen.general_revision_arc(), + Arc::clone(&self.shutdown_trigger), + auto_config_cfg, + ) + .await, ) - .await } else { None }; diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index e738a69fe..56abb8bc7 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -70,7 +70,7 @@ pub(crate) async fn auto_compactor( revision_getter: Arc, shutdown_trigger: Arc, auto_compact_cfg: AutoCompactConfig, -) -> Option> { +) -> Arc { let auto_compactor: Arc = match auto_compact_cfg { AutoCompactConfig::Periodic(period) => { PeriodicCompactor::new_arc(is_leader, client, revision_getter, shutdown_trigger, period) @@ -90,7 +90,7 @@ pub(crate) async fn auto_compactor( let _hd = tokio::spawn(async move { auto_compactor.run().await; }); - Some(compactor_handle) + compactor_handle } /// background compact executor diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index b3aba39e6..a3d8b12af 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -28,61 +28,38 @@ impl RevisionWindow { /// Create a new `RevisionWindow` fn new(retention: usize) -> Self { Self { - ring_buf: Vec::with_capacity(retention), + ring_buf: vec![0; retention], cursor: retention.overflow_sub(1), retention, } } /// Store the revision into the inner ring buffer - #[allow(clippy::integer_arithmetic)] + #[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] fn sample(&mut self, revision: i64) { self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow - match self.ring_buf.len().cmp(&self.retention) { - Ordering::Less => self.ring_buf.push(revision), - Ordering::Equal => { - if let Some(element) = self.ring_buf.get_mut(self.cursor) { - *element = revision; - } else { - unreachable!( - "ring_buf ({:?}) at {} should not be None", - self.ring_buf, self.cursor - ); - } - } - Ordering::Greater => { - unreachable!( - "the length of RevisionWindow should be less than {}", - self.retention - ) - } - } + self.ring_buf[self.cursor] = revision; } /// Retrieve the expired revision that is sampled period ago #[allow(clippy::indexing_slicing, clippy::integer_arithmetic)] fn expired_revision(&self) -> Option { - debug_assert!( - self.ring_buf.len() <= self.retention, - "the length of RevisionWindow should be less than {}", - self.retention - ); - if self.ring_buf.len() < self.retention { + let target = self.ring_buf[(self.cursor + 1) % self.retention]; + if target == 0 { None } else { - let target = (self.cursor + 1) % self.retention; - Some(self.ring_buf[target]) // it's ok to do so since ring_buf[target] should not be None. + Some(target) } } } /// Revision auto compactor #[derive(Debug)] -pub(crate) struct PeriodicCompactor { +pub(crate) struct PeriodicCompactor { /// `is_leader` indicates whether the current node is a leader or not. is_leader: AtomicBool, /// curp client - client: Arc, + client: Arc, /// revision getter revision_getter: Arc, /// shutdown trigger @@ -91,11 +68,11 @@ pub(crate) struct PeriodicCompactor { period: Duration, } -impl PeriodicCompactor { +impl PeriodicCompactor { /// Creates a new revision compactor pub(super) fn new_arc( is_leader: bool, - client: Arc, + client: Arc, revision_getter: Arc, shutdown_trigger: Arc, period: Duration, @@ -150,10 +127,11 @@ impl PeriodicCompactor { /// Calculate the sample frequency and the total amount of samples. fn sample_config(period: Duration) -> (Duration, usize) { - let one_hour = Duration::from_secs(60.overflow_mul(60)); - let base_interval = match period.cmp(&one_hour) { + /// one hour duration + const ONEHOUR: Duration = Duration::from_secs(3600); + let base_interval = match period.cmp(&ONEHOUR) { Ordering::Less => period, - Ordering::Equal | Ordering::Greater => one_hour, + Ordering::Equal | Ordering::Greater => ONEHOUR, }; let divisor = 10; let check_interval = base_interval @@ -170,7 +148,7 @@ fn sample_config(period: Duration) -> (Duration, usize) { } #[async_trait::async_trait] -impl Compactor for PeriodicCompactor { +impl Compactor for PeriodicCompactor { fn pause(&self) { self.is_leader.store(false, Relaxed); } @@ -214,18 +192,18 @@ mod test { fn revision_window_should_work() { let mut rw = RevisionWindow::new(3); assert!(rw.expired_revision().is_none()); - rw.sample(0); - assert!(rw.expired_revision().is_none()); rw.sample(1); assert!(rw.expired_revision().is_none()); rw.sample(2); - assert_eq!(rw.expired_revision(), Some(0)); + assert!(rw.expired_revision().is_none()); + rw.sample(3); + assert_eq!(rw.expired_revision(), Some(1)); // retention is 2 - // The first 3 minutes: 0,1,2 + // The first 3 minutes: 1,2,3 // The second 2 minutes: 3,4 rw.sample(3); rw.sample(4); - assert_eq!(rw.expired_revision(), Some(2)); + assert_eq!(rw.expired_revision(), Some(3)); } #[test] diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index 9b3327091..8603ffad9 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -18,11 +18,11 @@ const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); /// Revision auto compactor #[derive(Debug)] -pub(crate) struct RevisionCompactor { +pub(crate) struct RevisionCompactor { /// `is_leader` indicates whether the current node is a leader or not. is_leader: AtomicBool, /// curp client - client: Arc, + client: Arc, /// revision getter revision_getter: Arc, /// shutdown trigger @@ -31,11 +31,11 @@ pub(crate) struct RevisionCompactor { retention: i64, } -impl RevisionCompactor { +impl RevisionCompactor { /// Creates a new revision compactor pub(super) fn new_arc( is_leader: bool, - client: Arc, + client: Arc, revision_getter: Arc, shutdown_trigger: Arc, retention: i64, @@ -85,7 +85,7 @@ impl RevisionCompactor { } #[async_trait::async_trait] -impl Compactor for RevisionCompactor { +impl Compactor for RevisionCompactor { fn pause(&self) { self.is_leader.store(false, Relaxed); } diff --git a/xlinectl/src/command/get.rs b/xlinectl/src/command/get.rs index af4f72598..7f0fd0911 100644 --- a/xlinectl/src/command/get.rs +++ b/xlinectl/src/command/get.rs @@ -117,9 +117,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result #[cfg(test)] mod tests { - use crate::testcase_struct; - use super::*; + use crate::testcase_struct; testcase_struct!(RangeRequest); diff --git a/xlinectl/src/command/put.rs b/xlinectl/src/command/put.rs index 38871bf13..19f866768 100644 --- a/xlinectl/src/command/put.rs +++ b/xlinectl/src/command/put.rs @@ -50,9 +50,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result #[cfg(test)] mod tests { - use crate::testcase_struct; - use super::*; + use crate::testcase_struct; testcase_struct!(PutRequest); diff --git a/xlinectl/src/main.rs b/xlinectl/src/main.rs index 0af5a6d51..8f4841ba5 100644 --- a/xlinectl/src/main.rs +++ b/xlinectl/src/main.rs @@ -158,9 +158,13 @@ use clap::{arg, value_parser, Command}; use ext_utils::config::ClientTimeout; use xline_client::{Client, ClientOptions}; -use crate::command::{get, put}; -use crate::utils::parser::{parse_endpoints, parse_user}; -use crate::utils::printer::{set_printer_type, PrinterType}; +use crate::{ + command::{get, put}, + utils::{ + parser::{parse_endpoints, parse_user}, + printer::{set_printer_type, PrinterType}, + }, +}; /// Command definitions and parsers mod command; From 39478eb7345a645e2a43724b31072c8557e1059b Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Fri, 4 Aug 2023 15:51:52 +0800 Subject: [PATCH 9/9] refactor: refactor the error handling logic in compact Closes: 188 Signed-off-by: Phoeniix Zhao --- xline/src/server/kv_server.rs | 78 +++++-------------- xline/src/server/watch_server.rs | 32 +++----- xline/src/storage/compact/mod.rs | 25 ++++-- .../src/storage/compact/periodic_compactor.rs | 25 ++++-- .../src/storage/compact/revision_compactor.rs | 23 ++++-- xline/src/storage/mod.rs | 1 - 6 files changed, 85 insertions(+), 99 deletions(-) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 38f6e6702..9d8e244a1 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -437,18 +437,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - range_request_with_future_rev.revision, current_revision - )) - .to_string(); - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_future_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let range_request_with_compacted_rev = RangeRequest { key: b"foo".to_vec(), @@ -456,19 +450,13 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - range_request_with_compacted_rev.revision, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_compacted_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -487,20 +475,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - 20, current_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_future_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let txn_request_with_compacted_revision = TxnRequest { compare: vec![], @@ -514,20 +495,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - 3, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_compacted_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -537,24 +511,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - compact_request.revision, 8 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); - - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - compact_request.revision, 13 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); + + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } } diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index b3345ed32..736574c72 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -290,17 +290,17 @@ where /// Handle watch event async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) { let watch_id = watch_event.watch_id(); - let response = if watch_event.compacted() { - WatchResponse { - header: Some(ResponseHeader { - revision: watch_event.revision(), - ..ResponseHeader::default() - }), - watch_id, - compact_revision: self.kv_watcher.compacted_revision(), - canceled: true, - ..WatchResponse::default() - } + let mut response = WatchResponse { + header: Some(ResponseHeader { + revision: watch_event.revision(), + ..ResponseHeader::default() + }), + watch_id, + ..WatchResponse::default() + }; + if watch_event.compacted() { + response.compact_revision = self.kv_watcher.compacted_revision(); + response.canceled = true; } else { let mut events = watch_event.take_events(); if events.is_empty() { @@ -318,15 +318,7 @@ where } } } - WatchResponse { - header: Some(ResponseHeader { - revision: watch_event.revision(), - ..ResponseHeader::default() - }), - watch_id, - events, - ..WatchResponse::default() - } + response.events = events; }; if self.response_tx.send(Ok(response)).await.is_err() { diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 56abb8bc7..4036131e3 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,7 +1,11 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use curp::{client::Client, cmd::ProposeId, error::ProposeError}; +use curp::{ + client::Client, + cmd::ProposeId, + error::CommandProposeError::{AfterSync, Execute}, +}; use event_listener::Event; use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; @@ -12,7 +16,7 @@ use uuid::Uuid; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, - KvStore, + ExecuteError, KvStore, }; use crate::{ revision_number::RevisionNumberGenerator, @@ -45,12 +49,12 @@ pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { #[async_trait] pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { /// do compact - async fn compact(&self, revision: i64) -> Result<(), ProposeError>; + async fn compact(&self, revision: i64) -> Result<(), ExecuteError>; } #[async_trait] impl Compactable for Client { - async fn compact(&self, revision: i64) -> Result<(), ProposeError> { + async fn compact(&self, revision: i64) -> Result<(), ExecuteError> { let request = CompactionRequest { revision, physical: false, @@ -58,8 +62,17 @@ impl Compactable for Client { let request_wrapper = RequestWithToken::new_with_token(request.into(), None); let propose_id = ProposeId::new(format!("auto-compactor-{}", Uuid::new_v4())); let cmd = Command::new(vec![], request_wrapper, propose_id); - let _cmd_res = self.propose(cmd).await?; - Ok(()) + if let Err(e) = self.propose(cmd).await { + #[allow(clippy::wildcard_enum_match_arm)] + match e { + Execute(e) | AfterSync(e) => Err(e), + _ => { + unreachable!("Compaction should not receive any errors other than ExecuteError, but it receives {e:?}"); + } + } + } else { + Ok(()) + } } } diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index a3d8b12af..76caf8812 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -12,7 +12,7 @@ use event_listener::Event; use tracing::{info, warn}; use super::{Compactable, Compactor}; -use crate::revision_number::RevisionNumberGenerator; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; /// `RevisionWindow` is a ring buffer used to store periodically sampled revision. struct RevisionWindow { @@ -106,13 +106,24 @@ impl PeriodicCompactor { "starting auto periodic compaction, revision = {}, period = {:?}", revision, self.period ); - // TODO: add more error processing logic + if let Err(e) = self.client.compact(revision).await { - warn!( - "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", - revision, self.period, e - ); - None + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", + revision, + compacted_rev, + self.period, + now.elapsed().as_secs() + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", + revision, self.period, e + ); + None + } } else { info!( "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index 8603ffad9..6f38cc0a0 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -11,7 +11,7 @@ use event_listener::Event; use tracing::{info, warn}; use super::{Compactable, Compactor}; -use crate::revision_number::RevisionNumberGenerator; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; /// check for the need of compaction every 5 minutes const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); @@ -65,13 +65,22 @@ impl RevisionCompactor { "starting auto revision compaction, revision = {}, retention = {}", target_revision, self.retention ); - // TODO: add more error processing logic if let Err(e) = self.client.compact(target_revision).await { - warn!( - "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", - target_revision, self.retention, e - ); - None + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", + target_revision, + compacted_rev, + self.retention, + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", + target_revision, self.retention, e + ); + None + } } else { info!( "completed auto revision compaction, revision = {}, retention = {}, took {:?}", diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index a3961d980..1463cdcb1 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -22,7 +22,6 @@ pub(crate) mod snapshot_allocator; pub(crate) mod storage_api; pub use self::execute_error::ExecuteError; - pub(crate) use self::{ auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, revision::Revision, };