Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 8, 2024
1 parent 74c3f9d commit f18c726
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 36 deletions.
44 changes: 25 additions & 19 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::sync::{Mutex as TokioMutex, Notify};

use self::rwlock::KeyRwLock;
use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::runner::Runner;
Expand Down Expand Up @@ -74,11 +74,11 @@ pub(crate) struct ProcedureMeta {
impl ProcedureMeta {
fn new(
id: ProcedureId,
init_state: InitProcedureState,
procedure_state: ProcedureState,
parent_id: Option<ProcedureId>,
lock_key: LockKey,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(init_state.into());
let (state_sender, state_receiver) = watch::channel(procedure_state);
ProcedureMeta {
id,
parent_id,
Expand Down Expand Up @@ -429,15 +429,15 @@ impl LocalManager {
fn submit_root(
&self,
procedure_id: ProcedureId,
init_state: InitProcedureState,
procedure_state: ProcedureState,
step: u32,
procedure: BoxedProcedure,
) -> Result<Watcher> {
ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);

let meta = Arc::new(ProcedureMeta::new(
procedure_id,
init_state,
procedure_state,
None,
procedure.lock_key(),
));
Expand Down Expand Up @@ -479,13 +479,11 @@ impl LocalManager {
Ok(watcher)
}

fn submit_recovered_messages<F>(
fn submit_recovered_messages(
&self,
messages: HashMap<ProcedureId, ProcedureMessage>,
init_state_loader: F,
) where
F: Fn(&ProcedureMessage) -> InitProcedureState,
{
init_state: InitProcedureState,
) {
for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
// This is the root procedure. We only submit the root procedure as it will
Expand All @@ -505,9 +503,21 @@ impl LocalManager {
loaded_procedure.step
);

let procedure_state = match init_state {
InitProcedureState::RollingBack => ProcedureState::RollingBack {
error: Arc::new(
error::ProcedureRecoveredAfterFailsSnafu {
error: message.error.clone().unwrap_or("Unknown error".to_string()),
}
.build(),
),
},
InitProcedureState::Running => ProcedureState::Running,
};

if let Err(e) = self.submit_root(
*procedure_id,
init_state_loader(message),
procedure_state,
loaded_procedure.step,
loaded_procedure.procedure,
) {
Expand All @@ -525,12 +535,8 @@ impl LocalManager {
let (messages, rollback_messages, finished_ids) =
self.procedure_store.load_messages().await?;
// Submits recovered messages first.
self.submit_recovered_messages(rollback_messages, |message| {
InitProcedureState::RollingBack(
message.error.clone().unwrap_or("Unknown error".to_string()),
)
});
self.submit_recovered_messages(messages, |_| InitProcedureState::Running);
self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
self.submit_recovered_messages(messages, InitProcedureState::Running);

if !finished_ids.is_empty() {
logging::info!(
Expand Down Expand Up @@ -617,7 +623,7 @@ impl ProcedureManager for LocalManager {

self.submit_root(
procedure.id,
InitProcedureState::Running,
ProcedureState::Running,
0,
procedure.procedure,
)
Expand Down Expand Up @@ -661,7 +667,7 @@ pub(crate) mod test_util {
pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
ProcedureMeta::new(
ProcedureId::random(),
InitProcedureState::Running,
ProcedureState::Running,
None,
LockKey::default(),
)
Expand Down
8 changes: 4 additions & 4 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time;
use super::rwlock::OwnedKeyRwLockGuard;
use crate::error::{self, ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{InitProcedureState, Output, StringKey};
use crate::procedure::{Output, StringKey};
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};

Expand Down Expand Up @@ -335,7 +335,7 @@ impl Runner {
fn submit_subprocedure(
&self,
procedure_id: ProcedureId,
init_state: InitProcedureState,
procedure_state: ProcedureState,
mut procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
Expand All @@ -357,7 +357,7 @@ impl Runner {

let meta = Arc::new(ProcedureMeta::new(
procedure_id,
init_state,
procedure_state,
Some(self.meta.id),
procedure.lock_key(),
));
Expand Down Expand Up @@ -417,7 +417,7 @@ impl Runner {

self.submit_subprocedure(
subprocedure.id,
InitProcedureState::Running,
ProcedureState::Running,
subprocedure.procedure,
);
}
Expand Down
15 changes: 2 additions & 13 deletions src/common/procedure/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub trait Procedure: Send {
error::RollbackNotSupportedSnafu {}.fail()
}

/// Indices whether support to rollback the procedure.
/// Indicates whether it supports rolling back the procedure.
fn is_support_rollback(&self) -> bool {
false
}
Expand Down Expand Up @@ -371,22 +371,11 @@ impl ProcedureState {
}
}

impl From<InitProcedureState> for ProcedureState {
fn from(value: InitProcedureState) -> Self {
match value {
InitProcedureState::Running => ProcedureState::Running,
InitProcedureState::RollingBack(error) => ProcedureState::RollingBack {
error: Arc::new(error::ProcedureRecoveredAfterFailsSnafu { error }.build()),
},
}
}
}

/// The initial procedure state.
#[derive(Debug, Clone)]
pub enum InitProcedureState {
Running,
RollingBack(String),
RollingBack,
}

// TODO(yingwen): Shutdown
Expand Down

0 comments on commit f18c726

Please sign in to comment.