diff --git a/Cargo.lock b/Cargo.lock index 59e9b72a1..c42af8dee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3010,6 +3010,7 @@ name = "utils" version = "0.1.0" dependencies = [ "async-trait", + "clippy-utilities 0.2.0", "derive_builder", "getset", "madsim-tokio", diff --git a/curp-external-api/src/snapshot.rs b/curp-external-api/src/snapshot.rs index 8b21ecdf2..1f653041a 100644 --- a/curp-external-api/src/snapshot.rs +++ b/curp-external-api/src/snapshot.rs @@ -1,6 +1,7 @@ +use std::error::Error; + use async_trait::async_trait; use engine::Snapshot as EngineSnapshot; -use std::error::Error; /// The snapshot allocation is handled by the upper-level application #[allow(clippy::module_name_repetitions)] // it's re-exported in lib diff --git a/curp/src/log_entry.rs b/curp/src/log_entry.rs index 5a5155d0e..af6a7517c 100644 --- a/curp/src/log_entry.rs +++ b/curp/src/log_entry.rs @@ -1,8 +1,7 @@ use std::sync::Arc; -use serde::{Deserialize, Serialize}; - use curp_external_api::LogIndex; +use serde::{Deserialize, Serialize}; /// Log entry #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index fb837d2bd..ac4a475d4 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use serde::{de::DeserializeOwned, Serialize}; -pub use self::proto::protocol_server::ProtocolServer; pub(crate) use self::proto::{ fetch_read_state_response::ReadState, propose_response::ExeResult, @@ -13,8 +12,8 @@ pub(crate) use self::proto::{ WaitSyncedRequest, WaitSyncedResponse, }; pub use self::proto::{ - propose_response, protocol_client, FetchLeaderRequest, FetchLeaderResponse, ProposeRequest, - ProposeResponse, + propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest, + FetchLeaderResponse, ProposeRequest, ProposeResponse, }; use crate::{ cmd::{Command, ProposeId}, diff --git a/curp/src/server/cmd_worker/mod.rs b/curp/src/server/cmd_worker/mod.rs index 5a24ada1f..edb13b690 100644 --- a/curp/src/server/cmd_worker/mod.rs +++ b/curp/src/server/cmd_worker/mod.rs @@ -265,16 +265,16 @@ pub(super) fn start_cmd_workers< mod tests { use std::time::Duration; - use tokio::{sync::mpsc, time::Instant}; - use tracing_test::traced_test; - - use super::*; - use crate::log_entry::LogEntry; use curp_test_utils::{ mock_role_change, sleep_millis, sleep_secs, test_cmd::{TestCE, TestCommand}, }; use test_macros::abort_on_panic; + use tokio::{sync::mpsc, time::Instant}; + use tracing_test::traced_test; + + use super::*; + use crate::log_entry::LogEntry; // This should happen in fast path in most cases #[traced_test] diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 0c03c5e27..54caac77b 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -13,7 +13,7 @@ use tokio::{ time::MissedTickBehavior, }; use tracing::{debug, error, info, warn}; -use utils::config::{ClientTimeout, CurpConfig}; +use utils::config::CurpConfig; use super::{ cmd_board::{CmdBoardRef, CommandBoard}, @@ -24,7 +24,6 @@ use super::{ storage::{StorageApi, StorageError}, }; use crate::{ - client::Client, cmd::{Command, CommandExecutor}, error::RpcError, log_entry::LogEntry, @@ -274,12 +273,6 @@ impl CurpNode { /// Spawned tasks impl CurpNode { - /// get curp inner client from `CurpNode` - #[inline] - pub(crate) async fn inner_client(&self, client_timeout: ClientTimeout) -> Client { - self.curp.inner_client(client_timeout).await - } - /// Tick periodically async fn election_task( curp: Arc>, diff --git a/curp/src/server/gc.rs b/curp/src/server/gc.rs index 091b7908a..11a036e67 100644 --- a/curp/src/server/gc.rs +++ b/curp/src/server/gc.rs @@ -72,6 +72,7 @@ async fn gc_cmd_board(cmd_board: CmdBoardRef, interval: mod tests { use std::{sync::Arc, time::Duration}; + use curp_test_utils::{sleep_secs, test_cmd::TestCommand}; use parking_lot::{Mutex, RwLock}; use test_macros::abort_on_panic; @@ -84,7 +85,6 @@ mod tests { spec_pool::{SpecPoolRef, SpeculativePool}, }, }; - use curp_test_utils::{sleep_secs, test_cmd::TestCommand}; #[tokio::test] #[abort_on_panic] diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index 22e1c7fde..7d2879416 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -9,14 +9,10 @@ use tokio_stream::wrappers::TcpListenerStream; #[cfg(not(madsim))] use tracing::info; use tracing::instrument; -use utils::{ - config::{ClientTimeout, CurpConfig}, - tracing::Extract, -}; +use utils::{config::CurpConfig, tracing::Extract}; use self::curp_node::{CurpError, CurpNode}; use crate::{ - client::Client, cmd::{Command, CommandExecutor}, error::ServerError, members::ClusterMember, @@ -303,12 +299,6 @@ impl Rpc { pub fn leader_rx(&self) -> broadcast::Receiver> { self.inner.leader_rx() } - - /// Get an inner client - #[inline] - pub async fn inner_client(&self, client_timeout: ClientTimeout) -> Client { - self.inner.inner_client(client_timeout).await - } } impl From for tonic::Status { diff --git a/curp/src/server/raw_curp/log.rs b/curp/src/server/raw_curp/log.rs index c89a457d4..3a4e3a38d 100644 --- a/curp/src/server/raw_curp/log.rs +++ b/curp/src/server/raw_curp/log.rs @@ -372,10 +372,10 @@ impl Log { mod tests { use std::{iter::repeat, ops::Index, sync::Arc}; + use curp_test_utils::test_cmd::TestCommand; use utils::config::{default_batch_max_size, default_log_entries_cap}; use super::*; - use curp_test_utils::test_cmd::TestCommand; // impl index for test is handy impl Index for Log { diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index c75a7be82..bf979b7ef 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -29,7 +29,7 @@ use tracing::{ log::{log_enabled, Level}, }; use utils::{ - config::{ClientTimeout, CurpConfig}, + config::CurpConfig, parking_lot_lock::{MutexMap, RwLockMap}, }; @@ -39,7 +39,6 @@ use self::{ }; use super::cmd_worker::CEEventTxApi; use crate::{ - client::Client, cmd::{Command, ProposeId}, error::ProposeError, log_entry::LogEntry, @@ -627,16 +626,6 @@ impl RawCurp { raw_curp } - /// get curp inner client - pub(crate) async fn inner_client(&self, timeout: ClientTimeout) -> Client { - Client::::new( - Some(self.id().clone()), - self.ctx.cluster_info.all_members(), - timeout, - ) - .await - } - /// Create a new `RawCurp` /// `is_leader` will only take effect when all servers start from a fresh state #[allow(clippy::too_many_arguments)] // only called once diff --git a/curp/src/server/raw_curp/tests.rs b/curp/src/server/raw_curp/tests.rs index 0154efd17..780073379 100644 --- a/curp/src/server/raw_curp/tests.rs +++ b/curp/src/server/raw_curp/tests.rs @@ -1,5 +1,6 @@ use std::time::Instant; +use curp_test_utils::{mock_role_change, test_cmd::TestCommand}; use test_macros::abort_on_panic; use tokio::{sync::oneshot, time::sleep}; use tracing_test::traced_test; @@ -18,7 +19,6 @@ use crate::{ }, LogIndex, }; -use curp_test_utils::{mock_role_change, test_cmd::TestCommand}; // Hooks for tests impl RawCurp { diff --git a/curp/src/snapshot.rs b/curp/src/snapshot.rs index 1926976bf..c451306ac 100644 --- a/curp/src/snapshot.rs +++ b/curp/src/snapshot.rs @@ -1,8 +1,7 @@ -#[allow(clippy::module_name_repetitions)] // it's re-exported in lib -pub use curp_external_api::snapshot::SnapshotAllocator; - use std::fmt::Debug; +#[allow(clippy::module_name_repetitions)] // it's re-exported in lib +pub use curp_external_api::snapshot::SnapshotAllocator; use engine::Snapshot as EngineSnapshot; /// Snapshot diff --git a/scripts/validation_test.sh b/scripts/validation_test.sh index 90f367f8c..ca3846835 100755 --- a/scripts/validation_test.sh +++ b/scripts/validation_test.sh @@ -25,6 +25,25 @@ run_with_expect() { fi } +# run a command with expect output, based on key word match +# args: +# $1: command to run +# $2: expect output +run_with_match() { + cmd="${1}" + res=$(eval ${cmd} 2>&1) + expect=$(echo -e ${2}) + if echo "${res}" | grep -q "${expect}"; then + echo "command run success" + else + echo "command run failed" + echo "command: ${cmd}" + echo "expect: ${expect}" + echo "result: ${res}" + exit 1 + fi +} + # validate kv requests kv_validation() { echo "kv validation test running..." @@ -189,6 +208,21 @@ maintenance_validation() { echo "maintenance validation test passed" } +# validate compact requests +compact_validation() { + echo "compact validation test running..." + for value in "value1" "value2" "value3" "value4" "value5" "value6"; + do + run_with_expect "${ETCDCTL} put key ${value}" "OK" + done + run_with_expect "${ETCDCTL} get --rev=4 key" "key\nvalue3" + run_with_expect "${ETCDCTL} compact 5" "compacted revision 5" + run_with_match "${ETCDCTL} get --rev=4 key" "required revision 4 has been compacted, compacted revision is 5" + run_with_match "${ETCDCTL} watch --rev=4 key " "watch was canceled (etcdserver: mvcc: required revision has been compacted)" + echo "compact validation test pass..." +} + +compact_validation kv_validation watch_validation lease_validation diff --git a/simulation/tests/it/curp/server_election.rs b/simulation/tests/it/curp/server_election.rs index cd659cbf6..5b47a8969 100644 --- a/simulation/tests/it/curp/server_election.rs +++ b/simulation/tests/it/curp/server_election.rs @@ -1,7 +1,6 @@ use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand}; -use utils::config::ClientTimeout; - use simulation::curp_group::CurpGroup; +use utils::config::ClientTimeout; /// Wait some time for the election to finish, and get the leader to ensure that the election is /// completed. diff --git a/simulation/tests/it/curp/server_recovery.rs b/simulation/tests/it/curp/server_recovery.rs index 4ed9ae228..de9b55762 100644 --- a/simulation/tests/it/curp/server_recovery.rs +++ b/simulation/tests/it/curp/server_recovery.rs @@ -5,11 +5,10 @@ use std::sync::Arc; use curp_test_utils::{init_logger, sleep_secs, test_cmd::TestCommand, TEST_TABLE}; use engine::StorageEngine; use itertools::Itertools; +use simulation::curp_group::{CurpGroup, ProposeRequest}; use tracing::debug; use utils::config::ClientTimeout; -use simulation::curp_group::{CurpGroup, ProposeRequest}; - #[madsim::test] async fn leader_crash_and_recovery() { init_logger(); diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 3c49e1170..3ea216d72 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -36,6 +36,7 @@ toml = "0.5" thiserror = "1.0.31" tracing-appender = "0.2" derive_builder = "0.12.0" +clippy-utilities = "0.2.0" [dev-dependencies] opentelemetry-jaeger = "0.17.0" diff --git a/utils/src/config.rs b/utils/src/config.rs index 01bf30809..4c531214d 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -133,6 +133,9 @@ pub struct CompactConfig { #[getset(get = "pub")] #[serde(with = "duration_format", default = "default_compact_sleep_interval")] compact_sleep_interval: Duration, + /// The auto compactor config + #[getset(get = "pub")] + auto_compact_config: Option, } impl Default for CompactConfig { @@ -141,6 +144,7 @@ impl Default for CompactConfig { Self { compact_batch_size: default_compact_batch_size(), compact_sleep_interval: default_compact_sleep_interval(), + auto_compact_config: None, } } } @@ -149,10 +153,15 @@ impl CompactConfig { /// Create a new compact config #[must_use] #[inline] - pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self { + pub fn new( + compact_batch_size: usize, + compact_sleep_interval: Duration, + auto_compact_config: Option, + ) -> Self { Self { compact_batch_size, compact_sleep_interval, + auto_compact_config, } } } @@ -475,6 +484,23 @@ impl Default for ServerTimeout { } } +/// Auto Compactor Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde( + tag = "mode", + content = "retention", + rename_all(deserialize = "lowercase") +)] +pub enum AutoCompactConfig { + /// auto periodic compactor + #[serde(with = "duration_format")] + Periodic(Duration), + /// auto revision compactor + Revision(i64), +} + /// Storage Configuration #[allow(clippy::module_name_repetitions)] #[non_exhaustive] @@ -751,6 +777,10 @@ mod tests { compact_batch_size = 123 compact_sleep_interval = '5ms' + [compact.auto_compact_config] + mode = 'periodic' + retention = '10h' + [log] path = '/var/log/xline' rotation = 'daily' @@ -785,6 +815,7 @@ mod tests { Duration::from_millis(20), Duration::from_secs(1), ); + assert_eq!( config.cluster, ClusterConfig::new( @@ -825,7 +856,10 @@ mod tests { config.compact, CompactConfig { compact_batch_size: 123, - compact_sleep_interval: Duration::from_millis(5) + compact_sleep_interval: Duration::from_millis(5), + auto_compact_config: Some(AutoCompactConfig::Periodic(Duration::from_secs( + 10 * 60 * 60 + ))) } ); } @@ -907,4 +941,52 @@ mod tests { ); assert_eq!(config.compact, CompactConfig::default()); } + + #[allow(clippy::unwrap_used)] + #[test] + fn test_auto_revision_compactor_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + + [cluster.members] + node1 = '127.0.0.1:2379' + node2 = '127.0.0.1:2380' + node3 = '127.0.0.1:2381' + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = 'memory' + + [compact] + + [compact.auto_compact_config] + mode = 'revision' + retention = 10000 + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + # auth_public_key = './public_key'.pem' + # auth_private_key = './private_key.pem'"#, + ) + .unwrap(); + + assert_eq!( + config.compact, + CompactConfig { + auto_compact_config: Some(AutoCompactConfig::Revision(10000)), + ..Default::default() + } + ); + } } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 8493374d9..502b7c6a4 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -138,6 +138,7 @@ use std::{collections::HashMap, time::Duration}; +use clippy_utilities::OverflowArithmetic; use thiserror::Error; use crate::config::{ClusterRange, LevelConfig, RotationConfig}; @@ -156,6 +157,13 @@ pub mod tokio_lock; /// utils for pass span context pub mod tracing; +/// seconds per minute +const SECS_PER_MINUTE: u64 = 60; +/// seconds per hour +const SECS_PER_HOUR: u64 = 3600; +/// seconds per day, equals to 24 * 60 * 60 = 86400 +const SECS_PER_DAY: u64 = 86400; + /// Config Parse Error #[derive(Debug, Error)] #[non_exhaustive] @@ -236,9 +244,36 @@ pub fn parse_duration(s: &str) -> Result { "the value of time should not be empty ({s})" ))) } + } else if s.ends_with('m') { + if let Some(dur) = s.strip_suffix('m') { + let minutes: u64 = dur.parse()?; + Ok(Duration::from_secs(minutes.overflow_mul(SECS_PER_MINUTE))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } + } else if s.ends_with('h') { + if let Some(dur) = s.strip_suffix('h') { + let hours: u64 = dur.parse()?; + Ok(Duration::from_secs(hours.overflow_mul(SECS_PER_HOUR))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } + } else if s.ends_with('d') { + if let Some(dur) = s.strip_suffix('d') { + let days: u64 = dur.parse()?; + Ok(Duration::from_secs(days.overflow_mul(SECS_PER_DAY))) + } else { + Err(ConfigParseError::InvalidValue(format!( + "the value of time should not be empty ({s})" + ))) + } } else { Err(ConfigParseError::InvalidUnit(format!( - "the unit of time should be one of 'us', 'ms' or 's'({s})" + "the unit of time should be one of 'us', 'ms', 's', 'm', 'h' or 'd' ({s})" ))) } } @@ -322,6 +357,15 @@ mod test { assert_eq!(parse_duration("5s").unwrap(), Duration::from_secs(5)); assert_eq!(parse_duration("3ms").unwrap(), Duration::from_millis(3)); assert_eq!(parse_duration("1us").unwrap(), Duration::from_micros(1)); + assert_eq!(parse_duration("3m").unwrap(), Duration::from_secs(180)); + assert_eq!( + parse_duration("2h").unwrap(), + Duration::from_secs(2 * SECS_PER_HOUR) + ); + assert_eq!( + parse_duration("30d").unwrap(), + Duration::from_secs(30 * SECS_PER_DAY) + ); let results = vec![ parse_duration("hello world"), parse_duration("5x"), diff --git a/xline-client/src/types/auth.rs b/xline-client/src/types/auth.rs index c2b49b542..b0d7db90f 100644 --- a/xline-client/src/types/auth.rs +++ b/xline-client/src/types/auth.rs @@ -1,5 +1,4 @@ use xline::server::KeyRange; - pub use xlineapi::{ AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse, diff --git a/xline-client/src/types/kv.rs b/xline-client/src/types/kv.rs index 44ddd7000..bc16fa1ba 100644 --- a/xline-client/src/types/kv.rs +++ b/xline-client/src/types/kv.rs @@ -1,5 +1,4 @@ use xline::server::KeyRange; - pub use xlineapi::{ CompareResult, CompareTarget, DeleteRangeResponse, PutResponse, RangeResponse, Response, ResponseOp, SortOrder, SortTarget, TargetUnion, TxnResponse, @@ -714,7 +713,7 @@ impl From for xlineapi::TxnRequest { /// Compaction Request compacts the key-value store up to a given revision. /// All keys with revisions less than the given revision will be compacted. -/// The compaction process will remove all historical versions of these keys, except for the most recent one. +/// The compaction process will remove all historical versions of these keys, except for the most recent one. /// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)]. /// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)]. /// All revisions less than 3 are deleted. The latest revision, 3, will be kept. diff --git a/xline-client/src/types/lease.rs b/xline-client/src/types/lease.rs index 04e41ee98..01eed4fb7 100644 --- a/xline-client/src/types/lease.rs +++ b/xline-client/src/types/lease.rs @@ -1,11 +1,11 @@ -use crate::error::{ClientError, Result}; use futures::channel::mpsc::Sender; - pub use xlineapi::{ LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse, LeaseStatus, LeaseTimeToLiveResponse, }; +use crate::error::{ClientError, Result}; + /// The lease keep alive handle. #[derive(Debug)] pub struct LeaseKeeper { diff --git a/xline-client/src/types/watch.rs b/xline-client/src/types/watch.rs index e91c960c6..b98d2b45f 100644 --- a/xline-client/src/types/watch.rs +++ b/xline-client/src/types/watch.rs @@ -2,12 +2,11 @@ use std::fmt::Debug; use futures::channel::mpsc::Sender; use xline::server::KeyRange; +pub use xlineapi::{Event, EventType, KeyValue, WatchResponse}; use xlineapi::{RequestUnion, WatchCancelRequest, WatchProgressRequest}; use crate::error::{ClientError, Result}; -pub use xlineapi::{Event, EventType, KeyValue, WatchResponse}; - /// The watching handle. #[derive(Debug)] pub struct Watcher { diff --git a/xline-client/tests/watch.rs b/xline-client/tests/watch.rs index 788c66fa0..716d314e8 100644 --- a/xline-client/tests/watch.rs +++ b/xline-client/tests/watch.rs @@ -1,11 +1,14 @@ //! The following tests are originally from `etcd-client` -use crate::common::get_cluster_client; use xline_client::{ error::Result, - types::kv::PutRequest, - types::watch::{EventType, WatchRequest}, + types::{ + kv::PutRequest, + watch::{EventType, WatchRequest}, + }, }; +use crate::common::get_cluster_client; + mod common; #[tokio::test] diff --git a/xline/src/main.rs b/xline/src/main.rs index c3a259b1d..fbfb59076 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -157,8 +157,9 @@ use utils::{ default_propose_timeout, default_range_retry_timeout, default_retry_timeout, default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, default_sync_victims_interval, default_watch_progress_notify_interval, file_appender, - AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig, - LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig, + AuthConfig, AutoCompactConfig, ClientTimeout, ClusterConfig, CompactConfig, + CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout, StorageConfig, + TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation, }; @@ -269,6 +270,15 @@ struct ServerArgs { /// Interval between two compaction operations [default: 10ms] #[clap(long, value_parser = parse_duration)] compact_sleep_interval: Option, + /// Auto compact mode + #[clap(long)] + auto_compact_mode: Option, + /// Auto periodic compact retention + #[clap(long, value_parser = parse_duration)] + auto_periodic_retention: Option, + /// Auto revision compact retention + #[clap(long)] + auto_revision_retention: Option, } impl From for XlineServerConfig { @@ -335,10 +345,32 @@ impl From for XlineServerConfig { args.jaeger_level, ); let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); + let auto_comapctor_cfg = if let Some(mode) = args.auto_compact_mode { + match mode.as_str() { + "periodic" => { + let period = args.auto_periodic_retention.unwrap_or_else(|| { + panic!("missing auto_periodic_retention argument"); + }); + Some(AutoCompactConfig::Periodic(period)) + } + "revision" => { + let retention = args.auto_revision_retention.unwrap_or_else(|| { + panic!("missing auto_revision_retention argument"); + }); + Some(AutoCompactConfig::Revision(retention)) + } + &_ => unreachable!( + "xline only supports two auto-compaction modes: periodic, revision" + ), + } + } else { + None + }; let compact = CompactConfig::new( args.compact_batch_size, args.compact_sleep_interval .unwrap_or_else(default_compact_sleep_interval), + auto_comapctor_cfg, ); XlineServerConfig::new(cluster, storage, log, trace, auth, compact) } diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 9ca15bea0..9d8e244a1 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -191,7 +191,7 @@ where ) -> Result<(), tonic::Status> { (range_revision <= 0 || range_revision >= compacted_revision) .then_some(()) - .ok_or(tonic::Status::invalid_argument(format!( + .ok_or(tonic::Status::out_of_range(format!( "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" ))) } @@ -437,18 +437,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - range_request_with_future_rev.revision, current_revision - )) - .to_string(); - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_future_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let range_request_with_compacted_rev = RangeRequest { key: b"foo".to_vec(), @@ -456,19 +450,13 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - range_request_with_compacted_rev.revision, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_compacted_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -487,20 +475,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - 20, current_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_future_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let txn_request_with_compacted_revision = TxnRequest { compare: vec![], @@ -514,20 +495,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - 3, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_compacted_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -537,24 +511,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - compact_request.revision, 8 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); - - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - compact_request.revision, 13 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); + + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } } diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index 383a7b6eb..736574c72 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -198,17 +198,6 @@ where /// Handle `WatchCreateRequest` async fn handle_watch_create(&mut self, req: WatchCreateRequest) { - let compacted_revision = self.kv_watcher.compacted_revision(); - if req.start_revision < compacted_revision { - let result = Err(tonic::Status::invalid_argument(format!( - "required revision {} has been compacted, compacted revision is {}", - req.start_revision, compacted_revision - ))); - if self.response_tx.send(result).await.is_err() { - self.stop_notify.notify(1); - } - return; - } let Some(watch_id) = self.validate_watch_id(req.watch_id) else { let result = Err(tonic::Status::already_exists(format!( "Watch ID {} has already been used", @@ -250,7 +239,6 @@ where header: Some(self.header_gen.gen_header()), watch_id, created: true, - compact_revision: compacted_revision, ..WatchResponse::default() }; if self.response_tx.send(Ok(response)).await.is_err() { @@ -301,31 +289,38 @@ where /// Handle watch event async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) { - let mut events = watch_event.take_events(); - if events.is_empty() { - return; - } let watch_id = watch_event.watch_id(); - if self.prev_kv.contains(&watch_id) { - for ev in &mut events { - if !ev.is_create() { - let kv = ev - .kv - .as_ref() - .unwrap_or_else(|| panic!("event.kv can't be None")); - ev.prev_kv = self.kv_watcher.get_prev_kv(kv); - } - } - } - let response = WatchResponse { + let mut response = WatchResponse { header: Some(ResponseHeader { revision: watch_event.revision(), ..ResponseHeader::default() }), watch_id, - events, ..WatchResponse::default() }; + if watch_event.compacted() { + response.compact_revision = self.kv_watcher.compacted_revision(); + response.canceled = true; + } else { + let mut events = watch_event.take_events(); + if events.is_empty() { + return; + } + + if self.prev_kv.contains(&watch_id) { + for ev in &mut events { + if !ev.is_create() { + let kv = ev + .kv + .as_ref() + .unwrap_or_else(|| panic!("event.kv can't be None")); + ev.prev_kv = self.kv_watcher.get_prev_kv(kv); + } + } + } + response.events = events; + }; + if self.response_tx.send(Ok(response)).await.is_err() { self.stop_notify.notify(1); } @@ -791,8 +786,7 @@ mod test { })), }; req_tx.send(Ok(create_watch_req(1, 2))).await.unwrap(); - req_tx.send(Ok(create_watch_req(2, 3))).await.unwrap(); - req_tx.send(Ok(create_watch_req(3, 4))).await.unwrap(); + // req_tx.send(Ok(create_watch_req(3, 4))).await.unwrap(); let (res_tx, mut res_rx) = mpsc::channel(CHANNEL_SIZE); let _hd = tokio::spawn(WatchServer::::task( Arc::clone(&next_id_gen), @@ -803,24 +797,24 @@ mod test { default_watch_progress_notify_interval(), )); - for i in 0..3 { - let watch_res = res_rx.recv().await.unwrap(); - if i == 0 { - if let Err(e) = watch_res { - assert_eq!(e.code(), tonic::Code::InvalidArgument, - "watch a compacted revision should return invalid_argument error, but found {e:?}" - ); - } else { - unreachable!( - "watch create request with a compacted revision should not be successful" - ) - } - } else { - assert!( - watch_res.is_ok(), - "watch create request with a valid revision should be successful" - ); - } - } + // It's allowed to create a compacted watch request, but it will immediately cancel. Doing so is for the compatibility with etcdctl + let watch_create_success_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_create_success_res.created); + assert_eq!(watch_create_success_res.watch_id, 1); + let watch_cancel_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_cancel_res.canceled); + assert_eq!(watch_cancel_res.compact_revision, 3); + + req_tx.send(Ok(create_watch_req(2, 3))).await.unwrap(); + let watch_create_success_res = res_rx.recv().await.unwrap().unwrap(); + assert!(watch_create_success_res.created); + assert_eq!(watch_create_success_res.compact_revision, 0); + assert_eq!(watch_create_success_res.watch_id, 2); + + let watch_event_res = res_rx.recv().await.unwrap().unwrap(); + assert!(!watch_event_res.created); + assert!(!watch_event_res.canceled); + assert_eq!(watch_event_res.compact_revision, 0); + assert_eq!(watch_event_res.watch_id, 2); } } diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index 85e7ea101..f2bc28988 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -2,7 +2,9 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::Result; use clippy_utilities::{Cast, OverflowArithmetic}; -use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator}; +use curp::{ + client::Client, members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator, +}; use event_listener::Event; use jsonwebtoken::{DecodingKey, EncodingKey}; use tokio::{net::TcpListener, sync::mpsc::channel}; @@ -31,7 +33,7 @@ use crate::{ }, state::State, storage::{ - compact::{compactor, COMPACT_CHANNEL_SIZE}, + compact::{auto_compactor, compact_bg_task, COMPACT_CHANNEL_SIZE}, index::Index, kvwatcher::KvWatcher, lease_store::LeaseCollection, @@ -44,6 +46,9 @@ use crate::{ /// Rpc Server of curp protocol type CurpServer = Rpc>; +/// Rpc Client of curp protocol +type CurpClient = Client; + /// Xline server #[derive(Debug)] pub struct XlineServer { @@ -134,7 +139,7 @@ impl XlineServer { compact_task_tx, Arc::clone(&lease_collection), )); - let _hd = tokio::spawn(compactor( + let _hd = tokio::spawn(compact_bg_task( Arc::clone(&kv_storage), Arc::clone(&index), *self.compact_cfg.compact_batch_size(), @@ -296,7 +301,6 @@ impl XlineServer { let index_barrier = Arc::new(IndexBarrier::new()); let id_barrier = Arc::new(IdBarrier::new()); - let state = State::new(Arc::clone(&lease_storage)); let ce = CommandExecutor::new( Arc::clone(&kv_storage), Arc::clone(&auth_storage), @@ -314,6 +318,32 @@ impl XlineServer { _ => unimplemented!(), }; + let client = Arc::new( + CurpClient::new( + Some(self.cluster_info.self_id().clone()), + self.cluster_info.all_members(), + self.client_timeout, + ) + .await, + ); + + let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config() + { + Some( + auto_compactor( + self.is_leader, + Arc::clone(&client), + header_gen.general_revision_arc(), + Arc::clone(&self.shutdown_trigger), + auto_config_cfg, + ) + .await, + ) + } else { + None + }; + + let state = State::new(Arc::clone(&lease_storage), auto_compactor); let curp_server = CurpServer::new( Arc::clone(&self.cluster_info), self.is_leader, @@ -324,8 +354,6 @@ impl XlineServer { ) .await; - let client = Arc::new(curp_server.inner_client(self.client_timeout).await); - Ok(( KvServer::new( kv_storage, diff --git a/xline/src/state.rs b/xline/src/state.rs index fa5c3442a..44ebfa3f2 100644 --- a/xline/src/state.rs +++ b/xline/src/state.rs @@ -2,28 +2,42 @@ use std::{sync::Arc, time::Duration}; use curp::role_change::RoleChange; -use crate::storage::{storage_api::StorageApi, LeaseStore}; +use crate::storage::{compact::Compactor, storage_api::StorageApi, LeaseStore}; /// State of current node #[derive(Debug)] pub(crate) struct State { /// lease storage lease_storage: Arc>, + /// auto compactor + auto_compactor: Option>, } impl RoleChange for State { fn on_election_win(&self) { self.lease_storage.promote(Duration::from_secs(1)); // TODO: extend should be election timeout + if let Some(auto_compactor) = self.auto_compactor.as_ref() { + auto_compactor.resume(); + } } fn on_calibrate(&self) { self.lease_storage.demote(); + if let Some(auto_compactor) = self.auto_compactor.as_ref() { + auto_compactor.pause(); + } } } impl State { /// Create a new State - pub(super) fn new(lease_storage: Arc>) -> Self { - Self { lease_storage } + pub(super) fn new( + lease_storage: Arc>, + auto_compactor: Option>, + ) -> Self { + Self { + lease_storage, + auto_compactor, + } } } diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 4c335ad9e..4036131e3 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,19 +1,113 @@ use std::{sync::Arc, time::Duration}; +use async_trait::async_trait; +use curp::{ + client::Client, + cmd::ProposeId, + error::CommandProposeError::{AfterSync, Execute}, +}; use event_listener::Event; +use periodic_compactor::PeriodicCompactor; +use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; +use utils::config::AutoCompactConfig; +use uuid::Uuid; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, - KvStore, + ExecuteError, KvStore, +}; +use crate::{ + revision_number::RevisionNumberGenerator, + rpc::{CompactionRequest, RequestWithToken}, + server::command::Command, }; +/// mod revision compactor; +mod revision_compactor; + +/// mod periodic compactor; +mod periodic_compactor; + /// compact task channel size pub(crate) const COMPACT_CHANNEL_SIZE: usize = 32; +/// Compactor trait definition +#[async_trait] +pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { + /// run an auto-compactor + async fn run(&self); + /// pause an auto-compactor when the current node denotes to a non-leader role + fn pause(&self); + /// resume an auto-compactor when the current becomes a leader + fn resume(&self); +} + +/// `Compactable` trait indicates a method that receives a given revision and proposes a compact proposal +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { + /// do compact + async fn compact(&self, revision: i64) -> Result<(), ExecuteError>; +} + +#[async_trait] +impl Compactable for Client { + async fn compact(&self, revision: i64) -> Result<(), ExecuteError> { + let request = CompactionRequest { + revision, + physical: false, + }; + let request_wrapper = RequestWithToken::new_with_token(request.into(), None); + let propose_id = ProposeId::new(format!("auto-compactor-{}", Uuid::new_v4())); + let cmd = Command::new(vec![], request_wrapper, propose_id); + if let Err(e) = self.propose(cmd).await { + #[allow(clippy::wildcard_enum_match_arm)] + match e { + Execute(e) | AfterSync(e) => Err(e), + _ => { + unreachable!("Compaction should not receive any errors other than ExecuteError, but it receives {e:?}"); + } + } + } else { + Ok(()) + } + } +} + +/// Boot up an auto-compactor background task. +pub(crate) async fn auto_compactor( + is_leader: bool, + client: Arc>, + revision_getter: Arc, + shutdown_trigger: Arc, + auto_compact_cfg: AutoCompactConfig, +) -> Arc { + let auto_compactor: Arc = match auto_compact_cfg { + AutoCompactConfig::Periodic(period) => { + PeriodicCompactor::new_arc(is_leader, client, revision_getter, shutdown_trigger, period) + } + AutoCompactConfig::Revision(retention) => RevisionCompactor::new_arc( + is_leader, + client, + revision_getter, + shutdown_trigger, + retention, + ), + _ => { + unreachable!("xline only supports two auto-compaction modes: periodic, revision") + } + }; + let compactor_handle = Arc::clone(&auto_compactor); + let _hd = tokio::spawn(async move { + auto_compactor.run().await; + }); + compactor_handle +} + /// background compact executor -pub(crate) async fn compactor( +pub(crate) async fn compact_bg_task( kv_store: Arc>, index: Arc, batch_limit: usize, diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs new file mode 100644 index 000000000..76caf8812 --- /dev/null +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -0,0 +1,294 @@ +use std::{ + cmp::Ordering, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + time::{Duration, Instant}, +}; + +use clippy_utilities::OverflowArithmetic; +use event_listener::Event; +use tracing::{info, warn}; + +use super::{Compactable, Compactor}; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; + +/// `RevisionWindow` is a ring buffer used to store periodically sampled revision. +struct RevisionWindow { + /// inner ring buffer + ring_buf: Vec, + /// head index of the ring buffer + cursor: usize, + /// sample total amount + retention: usize, +} + +impl RevisionWindow { + /// Create a new `RevisionWindow` + fn new(retention: usize) -> Self { + Self { + ring_buf: vec![0; retention], + cursor: retention.overflow_sub(1), + retention, + } + } + + /// Store the revision into the inner ring buffer + #[allow(clippy::integer_arithmetic, clippy::indexing_slicing)] + fn sample(&mut self, revision: i64) { + self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow + self.ring_buf[self.cursor] = revision; + } + + /// Retrieve the expired revision that is sampled period ago + #[allow(clippy::indexing_slicing, clippy::integer_arithmetic)] + fn expired_revision(&self) -> Option { + let target = self.ring_buf[(self.cursor + 1) % self.retention]; + if target == 0 { + None + } else { + Some(target) + } + } +} + +/// Revision auto compactor +#[derive(Debug)] +pub(crate) struct PeriodicCompactor { + /// `is_leader` indicates whether the current node is a leader or not. + is_leader: AtomicBool, + /// curp client + client: Arc, + /// revision getter + revision_getter: Arc, + /// shutdown trigger + shutdown_trigger: Arc, + /// compaction period + period: Duration, +} + +impl PeriodicCompactor { + /// Creates a new revision compactor + pub(super) fn new_arc( + is_leader: bool, + client: Arc, + revision_getter: Arc, + shutdown_trigger: Arc, + period: Duration, + ) -> Arc { + Arc::new(Self { + is_leader: AtomicBool::new(is_leader), + client, + revision_getter, + shutdown_trigger, + period, + }) + } + + /// perform auto compaction logic + async fn do_compact( + &self, + last_revision: Option, + revision_window: &RevisionWindow, + ) -> Option { + if !self.is_leader.load(Relaxed) { + return None; + } + let target_revision = revision_window.expired_revision(); + if target_revision == last_revision { + return None; + } + let revision = + target_revision.unwrap_or_else(|| unreachable!("target revision shouldn't be None")); + let now = Instant::now(); + info!( + "starting auto periodic compaction, revision = {}, period = {:?}", + revision, self.period + ); + + if let Err(e) = self.client.compact(revision).await { + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", + revision, + compacted_rev, + self.period, + now.elapsed().as_secs() + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", + revision, self.period, e + ); + None + } + } else { + info!( + "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", + revision, + self.period, + now.elapsed().as_secs() + ); + target_revision + } + } +} + +/// Calculate the sample frequency and the total amount of samples. +fn sample_config(period: Duration) -> (Duration, usize) { + /// one hour duration + const ONEHOUR: Duration = Duration::from_secs(3600); + let base_interval = match period.cmp(&ONEHOUR) { + Ordering::Less => period, + Ordering::Equal | Ordering::Greater => ONEHOUR, + }; + let divisor = 10; + let check_interval = base_interval + .checked_div(divisor) + .unwrap_or_else(|| unreachable!("duration divisor should not be 0")); + let check_interval_secs = check_interval.as_secs(); + let periodic_secs = period.as_secs(); + let length = periodic_secs + .overflow_div(check_interval_secs) + .overflow_add(1); + let retention = + usize::try_from(length).unwrap_or_else(|e| panic!("auto compact period is too large: {e}")); + (check_interval, retention) +} + +#[async_trait::async_trait] +impl Compactor for PeriodicCompactor { + fn pause(&self) { + self.is_leader.store(false, Relaxed); + } + + fn resume(&self) { + self.is_leader.store(true, Relaxed); + } + + #[allow(clippy::integer_arithmetic)] + async fn run(&self) { + let mut last_revision: Option = None; + let shutdown_trigger = self.shutdown_trigger.listen(); + let (sample_frequency, sample_total) = sample_config(self.period); + let mut ticker = tokio::time::interval(sample_frequency); + let mut revision_window = RevisionWindow::new(sample_total); + tokio::pin!(shutdown_trigger); + loop { + revision_window.sample(self.revision_getter.get()); + tokio::select! { + _ = ticker.tick() => { + if let Some(last_compacted_rev) = self.do_compact(last_revision, &revision_window).await { + last_revision = Some(last_compacted_rev); + } + } + // To ensure that each iteration invokes the same `shutdown_trigger` and keeps + // events losing due to the cancellation of `shutdown_trigger` at bay. + _ = &mut shutdown_trigger => { + break; + } + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::compact::MockCompactable; + + #[test] + fn revision_window_should_work() { + let mut rw = RevisionWindow::new(3); + assert!(rw.expired_revision().is_none()); + rw.sample(1); + assert!(rw.expired_revision().is_none()); + rw.sample(2); + assert!(rw.expired_revision().is_none()); + rw.sample(3); + assert_eq!(rw.expired_revision(), Some(1)); + // retention is 2 + // The first 3 minutes: 1,2,3 + // The second 2 minutes: 3,4 + rw.sample(3); + rw.sample(4); + assert_eq!(rw.expired_revision(), Some(3)); + } + + #[test] + fn sample_config_should_success() { + // period is 59 minutes, less than one hour + let (interval, retention) = sample_config(Duration::from_secs(59 * 60)); + assert_eq!(interval, Duration::from_secs(354)); + assert_eq!(retention, 11); + + // period is 60 minutes, equal to one hour + let (interval, retention) = sample_config(Duration::from_secs(60 * 60)); + assert_eq!(interval, Duration::from_secs(6 * 60)); + assert_eq!(retention, 11); + + // period is 24 hours, lager than one hour + let (interval, retention) = sample_config(Duration::from_secs(24 * 60 * 60)); + assert_eq!(interval, Duration::from_secs(6 * 60)); + assert_eq!(retention, 241); + } + + #[tokio::test] + async fn periodic_compactor_should_work_in_normal_path() { + let mut revision_window = RevisionWindow::new(11); + // revision_window: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + for revision in 1..=11 { + revision_window.sample(revision); + } + let mut compactable = MockCompactable::new(); + compactable.expect_compact().times(3).returning(|_| Ok(())); + let shutdown_trigger = Arc::new(Event::new()); + let revision_gen = Arc::new(RevisionNumberGenerator::new(1)); + let periodic_compactor = PeriodicCompactor::new_arc( + true, + Arc::new(compactable), + revision_gen, + shutdown_trigger, + Duration::from_secs(10), + ); + // auto_compactor works successfully + assert_eq!( + periodic_compactor.do_compact(None, &revision_window).await, + Some(1) + ); + revision_window.sample(12); + assert_eq!( + periodic_compactor + .do_compact(Some(1), &revision_window) + .await, + Some(2) + ); + periodic_compactor.pause(); + revision_window.sample(13); + assert!(periodic_compactor + .do_compact(Some(2), &revision_window) + .await + .is_none()); + revision_window.sample(14); + assert!(periodic_compactor + .do_compact(Some(2), &revision_window) + .await + .is_none()); + periodic_compactor.resume(); + // revision_window: [12, 13, 14, 4, 5, 6, 7, 8, 9, 10, 11] + assert_eq!( + periodic_compactor + .do_compact(Some(3), &revision_window) + .await, + Some(4) + ); + + // auto compactor should skip those revisions which have been auto compacted. + assert!(periodic_compactor + .do_compact(Some(4), &revision_window) + .await + .is_none()); + } +} diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs new file mode 100644 index 000000000..6f38cc0a0 --- /dev/null +++ b/xline/src/storage/compact/revision_compactor.rs @@ -0,0 +1,161 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + time::{Duration, Instant}, +}; + +use clippy_utilities::OverflowArithmetic; +use event_listener::Event; +use tracing::{info, warn}; + +use super::{Compactable, Compactor}; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; + +/// check for the need of compaction every 5 minutes +const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); + +/// Revision auto compactor +#[derive(Debug)] +pub(crate) struct RevisionCompactor { + /// `is_leader` indicates whether the current node is a leader or not. + is_leader: AtomicBool, + /// curp client + client: Arc, + /// revision getter + revision_getter: Arc, + /// shutdown trigger + shutdown_trigger: Arc, + /// revision retention + retention: i64, +} + +impl RevisionCompactor { + /// Creates a new revision compactor + pub(super) fn new_arc( + is_leader: bool, + client: Arc, + revision_getter: Arc, + shutdown_trigger: Arc, + retention: i64, + ) -> Arc { + Arc::new(Self { + is_leader: AtomicBool::new(is_leader), + client, + revision_getter, + shutdown_trigger, + retention, + }) + } + + /// perform auto compaction logic + async fn do_compact(&self, last_revision: Option) -> Option { + if !self.is_leader.load(Relaxed) { + return None; + } + + let target_revision = self.revision_getter.get().overflow_sub(self.retention); + if target_revision <= 0 || Some(target_revision) <= last_revision { + return None; + } + + let now = Instant::now(); + info!( + "starting auto revision compaction, revision = {}, retention = {}", + target_revision, self.retention + ); + if let Err(e) = self.client.compact(target_revision).await { + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", + target_revision, + compacted_rev, + self.retention, + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", + target_revision, self.retention, e + ); + None + } + } else { + info!( + "completed auto revision compaction, revision = {}, retention = {}, took {:?}", + target_revision, + self.retention, + now.elapsed().as_secs() + ); + Some(target_revision) + } + } +} + +#[async_trait::async_trait] +impl Compactor for RevisionCompactor { + fn pause(&self) { + self.is_leader.store(false, Relaxed); + } + + fn resume(&self) { + self.is_leader.store(true, Relaxed); + } + + #[allow(clippy::integer_arithmetic)] + async fn run(&self) { + let mut last_revision = None; + let shutdown_trigger = self.shutdown_trigger.listen(); + let mut ticker = tokio::time::interval(CHECK_INTERVAL); + tokio::pin!(shutdown_trigger); + loop { + tokio::select! { + _ = ticker.tick() => { + if let Some(last_compacted_rev) = self.do_compact(last_revision).await { + last_revision = Some(last_compacted_rev); + } + } + // To ensure that each iteration invokes the same `shutdown_trigger` and keeps + // events losing due to the cancellation of `shutdown_trigger` at bay. + _ = &mut shutdown_trigger => { + break; + } + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::compact::MockCompactable; + + #[tokio::test] + async fn revision_compactor_should_work_in_normal_path() { + let mut compactable = MockCompactable::new(); + compactable.expect_compact().times(3).returning(|_| Ok(())); + let shutdown_trigger = Arc::new(Event::new()); + let revision_gen = Arc::new(RevisionNumberGenerator::new(110)); + let revision_compactor = RevisionCompactor::new_arc( + true, + Arc::new(compactable), + Arc::clone(&revision_gen), + shutdown_trigger, + 100, + ); + // auto_compactor works successfully + assert_eq!(revision_compactor.do_compact(None).await, Some(10)); + revision_gen.next(); // current revision: 111 + assert_eq!(revision_compactor.do_compact(Some(10)).await, Some(11)); + revision_compactor.pause(); + revision_gen.next(); // current revision 112 + assert!(revision_compactor.do_compact(Some(11)).await.is_none()); + revision_gen.next(); // current revision 113 + assert!(revision_compactor.do_compact(Some(11)).await.is_none()); + revision_compactor.resume(); + assert_eq!(revision_compactor.do_compact(Some(11)).await, Some(13)); + // auto compactor should skip those revisions which have been auto compacted. + assert!(revision_compactor.do_compact(Some(13)).await.is_none()); + } +} diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index b4eeab6c8..434c07acb 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -869,7 +869,7 @@ mod test { revision_number::RevisionNumberGenerator, rpc::{Request as UniRequest, RequestOp}, storage::{ - compact::{compactor, COMPACT_CHANNEL_SIZE}, + compact::{compact_bg_task, COMPACT_CHANNEL_SIZE}, db::DB, kvwatcher::KvWatcher, }, @@ -929,7 +929,7 @@ mod test { shutdown_trigger, Duration::from_millis(10), ); - let _compactor = tokio::spawn(compactor( + let _compactor = tokio::spawn(compact_bg_task( Arc::clone(&storage), index, 1000, diff --git a/xline/src/storage/kvwatcher.rs b/xline/src/storage/kvwatcher.rs index 0f373c9db..964553803 100644 --- a/xline/src/storage/kvwatcher.rs +++ b/xline/src/storage/kvwatcher.rs @@ -60,6 +60,8 @@ struct Watcher { stop_notify: Arc, /// Sender of watch event event_tx: mpsc::Sender, + /// Compacted flag + compacted: bool, } impl PartialEq for Watcher { @@ -85,6 +87,7 @@ impl Watcher { filters: Vec, stop_notify: Arc, event_tx: mpsc::Sender, + compacted: bool, ) -> Self { Self { key_range, @@ -93,6 +96,7 @@ impl Watcher { filters, stop_notify, event_tx, + compacted, } } @@ -123,24 +127,20 @@ impl Watcher { &mut self, (revision, events): (i64, Vec), ) -> Result<(), TrySendError> { - if revision < self.start_rev { - return Ok(()); - } - let events = self.filter_events(events); - if events.is_empty() { - return Ok(()); - } let watch_id = self.watch_id(); - debug!( - watch_id, - revision, - events_len = events.len(), - "try to send watch response" - ); + let events = self.filter_events(events); + let events_len = events.len(); let watch_event = WatchEvent { id: watch_id, events, revision, + compacted: self.compacted, + }; + if !self.compacted { + if revision < self.start_rev || 0 == events_len { + return Ok(()); + } + debug!(watch_id, revision, events_len, "try to send watch response"); }; match self.event_tx.try_send(watch_event) { @@ -238,6 +238,7 @@ impl WatcherMap { id: watch_id, revision: updates.0, events: updates.1, + compacted: false, }; assert!( self.victims @@ -317,6 +318,7 @@ where stop_notify: Arc, event_tx: mpsc::Sender, ) { + let compacted = start_rev != 0 && start_rev < self.compacted_revision(); let mut watcher = Watcher::new( key_range.clone(), id, @@ -324,8 +326,22 @@ where filters, stop_notify, event_tx, + compacted, ); let mut watcher_map_w = self.watcher_map.write(); + if compacted { + debug!("The revision {watcher:?} required has been compacted"); + if let Err(TrySendError::Full(watch_event)) = watcher.notify((0, vec![])) { + assert!( + watcher_map_w + .victims + .insert(watcher, (watch_event.revision, watch_event.events)) + .is_none(), + "can't insert a watcher to victims twice" + ); + }; + return; + } let initial_events = if start_rev == 0 { vec![] @@ -446,7 +462,9 @@ where watche_id = watcher.watch_id(), "watcher synced by sync_victims_task" ); - watcher_map_w.register(watcher); + if !watcher.compacted { + watcher_map_w.register(watcher); + } } } if !new_victims.is_empty() { @@ -507,6 +525,8 @@ pub(crate) struct WatchEvent { events: Vec, /// Revision when this event is generated revision: i64, + /// Compacted WatchEvent + compacted: bool, } impl WatchEvent { @@ -524,6 +544,11 @@ impl WatchEvent { pub(crate) fn take_events(&mut self) -> Vec { std::mem::take(&mut self.events) } + + /// Check whether the `WatchEvent` is a compacted `WatchEvent` or not. + pub(crate) fn compacted(&self) -> bool { + self.compacted + } } /// Get the last revision of a event slice diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index c42d40e44..1463cdcb1 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -1,7 +1,7 @@ /// Storage for Auth pub(crate) mod auth_store; -/// Compactor -pub(crate) mod compact; +/// Compact module +pub(super) mod compact; /// Database module pub mod db; /// Execute error @@ -22,7 +22,6 @@ pub(crate) mod snapshot_allocator; pub(crate) mod storage_api; pub use self::execute_error::ExecuteError; - pub(crate) use self::{ auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, revision::Revision, }; diff --git a/xline/tests/auth_test.rs b/xline/tests/auth_test.rs index 47c4b4f4f..0b0bcca5e 100644 --- a/xline/tests/auth_test.rs +++ b/xline/tests/auth_test.rs @@ -3,7 +3,6 @@ use std::error::Error; use etcd_client::{AuthClient, ConnectOptions, GetOptions, Permission, PermissionType}; use test_macros::abort_on_panic; use xline::client::kv_types::{PutRequest, RangeRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/kv_test.rs b/xline/tests/kv_test.rs index c2c41e83e..063119c4a 100644 --- a/xline/tests/kv_test.rs +++ b/xline/tests/kv_test.rs @@ -5,7 +5,6 @@ use test_macros::abort_on_panic; use xline::client::kv_types::{ DeleteRangeRequest, PutRequest, RangeRequest, SortOrder, SortTarget, }; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/lease_test.rs b/xline/tests/lease_test.rs index 2dd0bcf9f..f0ddbb969 100644 --- a/xline/tests/lease_test.rs +++ b/xline/tests/lease_test.rs @@ -3,7 +3,6 @@ use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; use tracing::info; use xline::client::kv_types::{LeaseGrantRequest, PutRequest, RangeRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xline/tests/watch_test.rs b/xline/tests/watch_test.rs index 9267c674a..0524cfd89 100644 --- a/xline/tests/watch_test.rs +++ b/xline/tests/watch_test.rs @@ -3,7 +3,6 @@ use std::error::Error; use etcd_client::EventType; use test_macros::abort_on_panic; use xline::client::kv_types::{DeleteRangeRequest, PutRequest}; - use xline_test_utils::Cluster; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 421717b7f..8709a3553 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -181,7 +181,6 @@ mod leasepb { use serde::{Deserialize, Serialize}; -pub use self::etcdserverpb::range_request::{SortOrder, SortTarget}; pub use self::{ authpb::{permission::Type, Permission, Role, User, UserAddOptions}, etcdserverpb::{ @@ -193,6 +192,7 @@ pub use self::{ lease_server::{Lease, LeaseServer}, maintenance_client::MaintenanceClient, maintenance_server::{Maintenance, MaintenanceServer}, + range_request::{SortOrder, SortTarget}, request_op::Request, response_op::Response, watch_client::WatchClient, diff --git a/xlinectl/src/command/get.rs b/xlinectl/src/command/get.rs index af4f72598..7f0fd0911 100644 --- a/xlinectl/src/command/get.rs +++ b/xlinectl/src/command/get.rs @@ -117,9 +117,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result #[cfg(test)] mod tests { - use crate::testcase_struct; - use super::*; + use crate::testcase_struct; testcase_struct!(RangeRequest); diff --git a/xlinectl/src/command/put.rs b/xlinectl/src/command/put.rs index 38871bf13..19f866768 100644 --- a/xlinectl/src/command/put.rs +++ b/xlinectl/src/command/put.rs @@ -50,9 +50,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result #[cfg(test)] mod tests { - use crate::testcase_struct; - use super::*; + use crate::testcase_struct; testcase_struct!(PutRequest); diff --git a/xlinectl/src/main.rs b/xlinectl/src/main.rs index 0af5a6d51..8f4841ba5 100644 --- a/xlinectl/src/main.rs +++ b/xlinectl/src/main.rs @@ -158,9 +158,13 @@ use clap::{arg, value_parser, Command}; use ext_utils::config::ClientTimeout; use xline_client::{Client, ClientOptions}; -use crate::command::{get, put}; -use crate::utils::parser::{parse_endpoints, parse_user}; -use crate::utils::printer::{set_printer_type, PrinterType}; +use crate::{ + command::{get, put}, + utils::{ + parser::{parse_endpoints, parse_user}, + printer::{set_printer_type, PrinterType}, + }, +}; /// Command definitions and parsers mod command;