Skip to content

Commit

Permalink
feat: make RepeatedTask invoke remove_outdated_meta method (GreptimeT…
Browse files Browse the repository at this point in the history
…eam#1578)

* feat: make RepeatedTask invoke remove_outdated_meta method

* fix: typo

* chore: improve error message
  • Loading branch information
NiwakaDev committed May 16, 2023
1 parent 122bd5f commit 856ab5b
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 10 deletions.
14 changes: 14 additions & 0 deletions src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ pub enum Error {

#[snafu(display("Corrupted data, error: {source}"))]
CorruptedData { source: FromUtf8Error },

#[snafu(display("Failed to start the remove_outdated_meta method, error: {}", source))]
StartRemoveOutdatedMetaTask {
source: common_runtime::error::Error,
location: Location,
},

#[snafu(display("Failed to stop the remove_outdated_meta method, error: {}", source))]
StopRemoveOutdatedMetaTask {
source: common_runtime::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -145,6 +157,8 @@ impl ErrorExt for Error {
}
Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
}
}

Expand Down
122 changes: 112 additions & 10 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ use std::time::{Duration, Instant};

use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::logging;
use snafu::ensure;
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::Notify;

use crate::error::{DuplicateProcedureSnafu, LoaderConflictSnafu, Result};
use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::lock::LockMap;
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
Expand Down Expand Up @@ -341,13 +345,17 @@ impl ManagerContext {
pub struct ManagerConfig {
pub max_retry_times: usize,
pub retry_delay: Duration,
pub remove_outdated_meta_task_interval: Duration,
pub remove_outdated_meta_ttl: Duration,
}

impl Default for ManagerConfig {
fn default() -> Self {
Self {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_secs(60 * 10),
remove_outdated_meta_ttl: META_TTL,
}
}
}
Expand All @@ -358,16 +366,26 @@ pub struct LocalManager {
state_store: StateStoreRef,
max_retry_times: usize,
retry_delay: Duration,
remove_outdated_meta_task: RepeatedTask<Error>,
}

impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new());
let remove_outdated_meta_task = RepeatedTask::new(
config.remove_outdated_meta_task_interval,
Box::new(RemoveOutdatedMetaFunction {
manager_ctx: manager_ctx.clone(),
ttl: config.remove_outdated_meta_ttl,
}),
);
LocalManager {
manager_ctx: Arc::new(ManagerContext::new()),
manager_ctx,
state_store,
max_retry_times: config.max_retry_times,
retry_delay: config.retry_delay,
remove_outdated_meta_task,
}
}

Expand Down Expand Up @@ -419,16 +437,28 @@ impl ProcedureManager for LocalManager {
Ok(())
}

fn start(&self) -> Result<()> {
self.remove_outdated_meta_task
.start(common_runtime::bg_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;
Ok(())
}

async fn stop(&self) -> Result<()> {
self.remove_outdated_meta_task
.stop()
.await
.context(StopRemoveOutdatedMetaTaskSnafu)?;
Ok(())
}

async fn submit(&self, procedure: ProcedureWithId) -> Result<Watcher> {
let procedure_id = procedure.id;
ensure!(
!self.manager_ctx.contains_procedure(procedure_id),
DuplicateProcedureSnafu { procedure_id }
);

// TODO(yingwen): We can use a repeated task to remove outdated meta.
self.manager_ctx.remove_outdated_meta(META_TTL);

self.submit_root(procedure.id, 0, procedure.procedure)
}

Expand Down Expand Up @@ -487,18 +517,31 @@ impl ProcedureManager for LocalManager {
}

async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
self.manager_ctx.remove_outdated_meta(META_TTL);

Ok(self.manager_ctx.state(procedure_id))
}

fn procedure_watcher(&self, procedure_id: ProcedureId) -> Option<Watcher> {
self.manager_ctx.remove_outdated_meta(META_TTL);

self.manager_ctx.watcher(procedure_id)
}
}

struct RemoveOutdatedMetaFunction {
manager_ctx: Arc<ManagerContext>,
ttl: Duration,
}

#[async_trait::async_trait]
impl TaskFunction<Error> for RemoveOutdatedMetaFunction {
fn name(&self) -> &str {
"ProcedureManager-remove-outdated-meta-task"
}

async fn call(&mut self) -> Result<()> {
self.manager_ctx.remove_outdated_meta(self.ttl);
Ok(())
}
}

/// Create a new [ProcedureMeta] for test purpose.
#[cfg(test)]
mod test_util {
Expand Down Expand Up @@ -639,6 +682,7 @@ mod tests {
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
Expand All @@ -660,6 +704,7 @@ mod tests {
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
Expand Down Expand Up @@ -706,6 +751,7 @@ mod tests {
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
Expand Down Expand Up @@ -754,6 +800,7 @@ mod tests {
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
Expand Down Expand Up @@ -807,4 +854,59 @@ mod tests {
check_procedure(MockProcedure { panic: false }).await;
check_procedure(MockProcedure { panic: true }).await;
}

#[tokio::test]
async fn test_remove_outdated_meta_task() {
let dir = create_temp_dir("remove_outdated_meta_task");
let object_store = test_util::new_object_store(&dir);
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
remove_outdated_meta_task_interval: Duration::from_millis(1),
remove_outdated_meta_ttl: Duration::from_millis(1),
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);

let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
let procedure_id = ProcedureId::random();
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap();
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
manager.start().unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_none());

// The remove_outdated_meta method has been stopped, so any procedure meta-data will not be automatically removed.
manager.stop().await.unwrap();
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single("test.submit");
let procedure_id = ProcedureId::random();
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap();
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
watcher.changed().await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(manager
.procedure_state(procedure_id)
.await
.unwrap()
.is_some());
}
}
4 changes: 4 additions & 0 deletions src/common/procedure/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ pub trait ProcedureManager: Send + Sync + 'static {
/// Registers loader for specific procedure type `name`.
fn register_loader(&self, name: &str, loader: BoxedProcedureLoader) -> Result<()>;

fn start(&self) -> Result<()>;

async fn stop(&self) -> Result<()>;

/// Submits a procedure to execute.
///
/// Returns a [Watcher] to watch the created procedure.
Expand Down
13 changes: 13 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,16 @@ pub enum Error {

#[snafu(display("Payload not exist"))]
PayloadNotExist { location: Location },

#[snafu(display("Failed to start the procedure manager"))]
StartProcedureManager {
source: common_procedure::error::Error,
},

#[snafu(display("Failed to stop the procedure manager"))]
StopProcedureManager {
source: common_procedure::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -581,6 +591,9 @@ impl ErrorExt for Error {
source.status_code()
}
WaitProcedure { source, .. } => source.status_code(),
StartProcedureManager { source } | StopProcedureManager { source } => {
source.status_code()
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::datanode::{
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use crate::heartbeat::handler::HandlerGroupExecutor;
Expand Down Expand Up @@ -258,10 +259,17 @@ impl Instance {
.recover()
.await
.context(RecoverProcedureSnafu)?;
self.procedure_manager
.start()
.context(StartProcedureManagerSnafu)?;
Ok(())
}

pub async fn shutdown(&self) -> Result<()> {
self.procedure_manager
.stop()
.await
.context(StopProcedureManagerSnafu)?;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
Expand Down Expand Up @@ -568,6 +576,7 @@ pub(crate) async fn create_procedure_manager(
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
retry_delay: procedure_config.retry_delay,
..Default::default()
};

Ok(Arc::new(LocalManager::new(manager_config, state_store)))
Expand Down
1 change: 1 addition & 0 deletions src/table-procedure/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl TestEnv {
let config = ManagerConfig {
max_retry_times: 3,
retry_delay: Duration::from_secs(500),
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store));
let procedure_manager = Arc::new(LocalManager::new(config, state_store));
Expand Down

0 comments on commit 856ab5b

Please sign in to comment.