diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index d67190d380d3..b8c9e648641e 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -121,6 +121,18 @@ pub enum Error { #[snafu(display("Corrupted data, error: {source}"))] CorruptedData { source: FromUtf8Error }, + + #[snafu(display("Failed to start the remove_outdated_meta method, error: {}", source))] + StartRemoveOutdatedMetaTask { + source: common_runtime::error::Error, + location: Location, + }, + + #[snafu(display("Failed to stop the remove_outdated_meta method, error: {}", source))] + StopRemoveOutdatedMetaTask { + source: common_runtime::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -145,6 +157,8 @@ impl ErrorExt for Error { } Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected, Error::ProcedureExec { source, .. } => source.status_code(), + Error::StartRemoveOutdatedMetaTask { source, .. } + | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(), } } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 419ab9d14fa8..b054d76833fe 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -21,12 +21,16 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use backon::ExponentialBuilder; +use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::logging; -use snafu::ensure; +use snafu::{ensure, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::Notify; -use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result}; +use crate::error::{ + DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, + StopRemoveOutdatedMetaTaskSnafu, +}; use crate::local::lock::LockMap; use crate::local::runner::Runner; use crate::procedure::BoxedProcedureLoader; @@ -341,6 +345,8 @@ impl ManagerContext { pub struct ManagerConfig { pub max_retry_times: usize, pub retry_delay: Duration, + pub remove_outdated_meta_task_interval: Duration, + pub remove_outdated_meta_ttl: Duration, } impl Default for ManagerConfig { @@ -348,6 +354,8 @@ impl Default for ManagerConfig { Self { max_retry_times: 3, retry_delay: Duration::from_millis(500), + remove_outdated_meta_task_interval: Duration::from_secs(60 * 10), + remove_outdated_meta_ttl: META_TTL, } } } @@ -358,16 +366,26 @@ pub struct LocalManager { state_store: StateStoreRef, max_retry_times: usize, retry_delay: Duration, + remove_outdated_meta_task: RepeatedTask, } impl LocalManager { /// Create a new [LocalManager] with specific `config`. pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { + let manager_ctx = Arc::new(ManagerContext::new()); + let remove_outdated_meta_task = RepeatedTask::new( + config.remove_outdated_meta_task_interval, + Box::new(RemoveOutdatedMetaFunction { + manager_ctx: manager_ctx.clone(), + ttl: config.remove_outdated_meta_ttl, + }), + ); LocalManager { - manager_ctx: Arc::new(ManagerContext::new()), + manager_ctx, state_store, max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, + remove_outdated_meta_task, } } @@ -419,6 +437,21 @@ impl ProcedureManager for LocalManager { Ok(()) } + fn start(&self) -> Result<()> { + self.remove_outdated_meta_task + .start(common_runtime::bg_runtime()) + .context(StartRemoveOutdatedMetaTaskSnafu)?; + Ok(()) + } + + async fn stop(&self) -> Result<()> { + self.remove_outdated_meta_task + .stop() + .await + .context(StopRemoveOutdatedMetaTaskSnafu)?; + Ok(()) + } + async fn submit(&self, procedure: ProcedureWithId) -> Result { let procedure_id = procedure.id; ensure!( @@ -426,9 +459,6 @@ impl ProcedureManager for LocalManager { DuplicateProcedureSnafu { procedure_id } ); - // TODO(yingwen): We can use a repeated task to remove outdated meta. - self.manager_ctx.remove_outdated_meta(META_TTL); - self.submit_root(procedure.id, 0, procedure.procedure) } @@ -487,18 +517,31 @@ impl ProcedureManager for LocalManager { } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { - self.manager_ctx.remove_outdated_meta(META_TTL); - Ok(self.manager_ctx.state(procedure_id)) } fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option { - self.manager_ctx.remove_outdated_meta(META_TTL); - self.manager_ctx.watcher(procedure_id) } } +struct RemoveOutdatedMetaFunction { + manager_ctx: Arc, + ttl: Duration, +} + +#[async_trait::async_trait] +impl TaskFunction for RemoveOutdatedMetaFunction { + fn name(&self) -> &str { + "ProcedureManager-remove-outdated-meta-task" + } + + async fn call(&mut self) -> Result<()> { + self.manager_ctx.remove_outdated_meta(self.ttl); + Ok(()) + } +} + /// Create a new [ProcedureMeta] for test purpose. #[cfg(test)] mod test_util { @@ -639,6 +682,7 @@ mod tests { let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); @@ -660,6 +704,7 @@ mod tests { let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); let manager = LocalManager::new(config, state_store); @@ -706,6 +751,7 @@ mod tests { let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); @@ -754,6 +800,7 @@ mod tests { let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), + ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let manager = LocalManager::new(config, state_store); @@ -807,4 +854,59 @@ mod tests { check_procedure(MockProcedure { panic: false }).await; check_procedure(MockProcedure { panic: true }).await; } + + #[tokio::test] + async fn test_remove_outdated_meta_task() { + let dir = create_temp_dir("remove_outdated_meta_task"); + let object_store = test_util::new_object_store(&dir); + let config = ManagerConfig { + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + remove_outdated_meta_task_interval: Duration::from_millis(1), + remove_outdated_meta_ttl: Duration::from_millis(1), + }; + let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); + let manager = LocalManager::new(config, state_store); + + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap(); + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + manager.start().unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_none()); + + // The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed. + manager.stop().await.unwrap(); + let mut procedure = ProcedureToLoad::new("submit"); + procedure.lock_key = LockKey::single("test.submit"); + let procedure_id = ProcedureId::random(); + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap(); + let mut watcher = manager.procedure_watcher(procedure_id).unwrap(); + watcher.changed().await.unwrap(); + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(manager + .procedure_state(procedure_id) + .await + .unwrap() + .is_some()); + } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index bba0f1ba3276..6eaa075408cf 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -260,6 +260,10 @@ pub trait ProcedureManager: Send + Sync + 'static { /// Registers loader for specific procedure type `name`. fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>; + fn start(&self) -> Result<()>; + + async fn stop(&self) -> Result<()>; + /// Submits a procedure to execute. /// /// Returns a [Watcher] to watch the created procedure. diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 0f8198fffed6..531ce0a885ea 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -482,6 +482,16 @@ pub enum Error { #[snafu(display("Payload not exist"))] PayloadNotExist { location: Location }, + + #[snafu(display("Failed to start the procedure manager"))] + StartProcedureManager { + source: common_procedure::error::Error, + }, + + #[snafu(display("Failed to stop the procedure manager"))] + StopProcedureManager { + source: common_procedure::error::Error, + }, } pub type Result = std::result::Result; @@ -581,6 +591,9 @@ impl ErrorExt for Error { source.status_code() } WaitProcedure { source, .. } => source.status_code(), + StartProcedureManager { source } | StopProcedureManager { source } => { + source.status_code() + } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 95c79ded0438..3d09fa190de9 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -62,6 +62,7 @@ use crate::datanode::{ use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, + StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use crate::heartbeat::handler::HandlerGroupExecutor; @@ -258,10 +259,17 @@ impl Instance { .recover() .await .context(RecoverProcedureSnafu)?; + self.procedure_manager + .start() + .context(StartProcedureManagerSnafu)?; Ok(()) } pub async fn shutdown(&self) -> Result<()> { + self.procedure_manager + .stop() + .await + .context(StopProcedureManagerSnafu)?; if let Some(heartbeat_task) = &self.heartbeat_task { heartbeat_task .close() @@ -568,6 +576,7 @@ pub(crate) async fn create_procedure_manager( let manager_config = ManagerConfig { max_retry_times: procedure_config.max_retry_times, retry_delay: procedure_config.retry_delay, + ..Default::default() }; Ok(Arc::new(LocalManager::new(manager_config, state_store))) diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 63aee29281eb..3deff706ecf1 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -73,6 +73,7 @@ impl TestEnv { let config = ManagerConfig { max_retry_times: 3, retry_delay: Duration::from_secs(500), + ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(object_store)); let procedure_manager = Arc::new(LocalManager::new(config, state_store));