diff --git a/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs b/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs index 37c4598c9c..2c09748cbc 100644 --- a/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs +++ b/mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs @@ -97,7 +97,7 @@ impl From for HDAccountBalanceRpcError { impl From for HDAccountBalanceRpcError { fn from(e: RpcTaskError) -> Self { match e { - RpcTaskError::Canceled => HDAccountBalanceRpcError::Internal("Canceled".to_owned()), + RpcTaskError::Cancelled => HDAccountBalanceRpcError::Internal("Cancelled".to_owned()), RpcTaskError::Timeout(timeout) => HDAccountBalanceRpcError::Timeout(timeout), RpcTaskError::NoSuchTask(_) // `UnexpectedTaskStatus` and `UnexpectedUserAction` are not expected at the balance request. diff --git a/mm2src/coins/rpc_command/init_create_account.rs b/mm2src/coins/rpc_command/init_create_account.rs index 2e8f5c63f6..46707074a7 100644 --- a/mm2src/coins/rpc_command/init_create_account.rs +++ b/mm2src/coins/rpc_command/init_create_account.rs @@ -121,7 +121,7 @@ impl From for CreateAccountRpcError { fn from(e: RpcTaskError) -> Self { let error = e.to_string(); match e { - RpcTaskError::Canceled => CreateAccountRpcError::Internal("Canceled".to_owned()), + RpcTaskError::Cancelled => CreateAccountRpcError::Internal("Cancelled".to_owned()), RpcTaskError::Timeout(timeout) => CreateAccountRpcError::Timeout(timeout), RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => { CreateAccountRpcError::Internal(error) diff --git a/mm2src/coins/utxo/utxo_withdraw.rs b/mm2src/coins/utxo/utxo_withdraw.rs index 5e5842c954..3d4c0acc91 100644 --- a/mm2src/coins/utxo/utxo_withdraw.rs +++ b/mm2src/coins/utxo/utxo_withdraw.rs @@ -78,7 +78,7 @@ impl From for WithdrawError { fn from(e: RpcTaskError) -> Self { let error = e.to_string(); match e { - RpcTaskError::Canceled => WithdrawError::InternalError("Canceled".to_owned()), + RpcTaskError::Cancelled => WithdrawError::InternalError("Cancelled".to_owned()), RpcTaskError::Timeout(timeout) => WithdrawError::Timeout(timeout), RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => { WithdrawError::InternalError(error) diff --git a/mm2src/mm2_main/src/lp_init/init_hw.rs b/mm2src/mm2_main/src/lp_init/init_hw.rs index 5a1d4b02f2..726bea7967 100644 --- a/mm2src/mm2_main/src/lp_init/init_hw.rs +++ b/mm2src/mm2_main/src/lp_init/init_hw.rs @@ -66,7 +66,7 @@ impl From for InitHwError { fn from(e: RpcTaskError) -> Self { let error = e.to_string(); match e { - RpcTaskError::Canceled => InitHwError::Internal("Canceled".to_owned()), + RpcTaskError::Cancelled => InitHwError::Internal("Cancelled".to_owned()), RpcTaskError::Timeout(timeout) => InitHwError::Timeout(timeout), RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => InitHwError::Internal(error), RpcTaskError::UnexpectedUserAction { expected } => InitHwError::UnexpectedUserAction { expected }, diff --git a/mm2src/mm2_main/src/lp_init/init_metamask.rs b/mm2src/mm2_main/src/lp_init/init_metamask.rs index 0abf47c507..90c6dfc4de 100644 --- a/mm2src/mm2_main/src/lp_init/init_metamask.rs +++ b/mm2src/mm2_main/src/lp_init/init_metamask.rs @@ -57,7 +57,7 @@ impl From for InitMetamaskError { fn from(e: RpcTaskError) -> Self { let error = e.to_string(); match e { - RpcTaskError::Canceled => InitMetamaskError::Internal("Canceled".to_owned()), + RpcTaskError::Cancelled => InitMetamaskError::Internal("Cancelled".to_owned()), RpcTaskError::Timeout(timeout) => InitMetamaskError::Timeout(timeout), RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => { InitMetamaskError::Internal(error) diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 66a0801b7c..3a132a35b9 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -108,7 +108,7 @@ impl From for P2PInitError { #[derive(Clone, Debug, Display, EnumFromTrait, Serialize, SerializeErrorType)] #[serde(tag = "error_type", content = "error_data")] pub enum MmInitError { - Canceled, + Cancelled, #[from_trait(WithTimeout::timeout)] #[display(fmt = "Initialization timeout {:?}", _0)] Timeout(Duration), @@ -224,7 +224,7 @@ impl From for MmInitError { fn from(e: RpcTaskError) -> Self { let error = e.to_string(); match e { - RpcTaskError::Canceled => MmInitError::Canceled, + RpcTaskError::Cancelled => MmInitError::Cancelled, RpcTaskError::Timeout(timeout) => MmInitError::Timeout(timeout), RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } diff --git a/mm2src/rpc_task/src/handle.rs b/mm2src/rpc_task/src/handle.rs index 91174e134d..23586499da 100644 --- a/mm2src/rpc_task/src/handle.rs +++ b/mm2src/rpc_task/src/handle.rs @@ -53,7 +53,7 @@ impl RpcTaskHandle { user_action_rx .timeout(timeout) .await? - .map_to_mm(|_canceled| RpcTaskError::Canceled) + .map_to_mm(|_canceled| RpcTaskError::Cancelled) } pub(crate) fn finish(self, result: Result>) { @@ -62,6 +62,11 @@ impl RpcTaskHandle { .warn_log(); } + pub(crate) fn on_cancelled(self) { + self.lock_and_then(|mut task_manager| task_manager.on_task_cancelling_finished(self.task_id)) + .warn_log(); + } + fn prepare_task_result(result: Result>) -> TaskStatus { match result { Ok(task_item) => TaskStatus::Ok(task_item), diff --git a/mm2src/rpc_task/src/lib.rs b/mm2src/rpc_task/src/lib.rs index e7b6715c04..ebc4c20bf6 100644 --- a/mm2src/rpc_task/src/lib.rs +++ b/mm2src/rpc_task/src/lib.rs @@ -52,7 +52,7 @@ pub enum RpcTaskError { UnexpectedUserAction { expected: String, }, - Canceled, + Cancelled, Internal(String), } @@ -61,6 +61,7 @@ pub enum TaskStatusError { Idle, InProgress, AwaitingUserAction, + Cancelled, Finished, } diff --git a/mm2src/rpc_task/src/manager.rs b/mm2src/rpc_task/src/manager.rs index 5696a786b8..207c1ffba6 100644 --- a/mm2src/rpc_task/src/manager.rs +++ b/mm2src/rpc_task/src/manager.rs @@ -2,7 +2,7 @@ use crate::task::RpcTaskTypes; use crate::{AtomicTaskId, RpcTask, RpcTaskError, RpcTaskHandle, RpcTaskResult, RpcTaskStatus, RpcTaskStatusAlias, TaskAbortHandle, TaskAbortHandler, TaskId, TaskStatus, TaskStatusError, UserActionSender}; use common::executor::SpawnFuture; -use common::log::{debug, info, warn}; +use common::log::{debug, info}; use futures::channel::oneshot; use futures::future::{select, Either}; use mm2_err_handle::prelude::*; @@ -11,6 +11,16 @@ use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex, Weak}; +macro_rules! unexpected_task_status { + ($task_id:expr, actual = $actual:ident, expected = $expected:ident) => { + MmError::err(RpcTaskError::UnexpectedTaskStatus { + task_id: $task_id, + actual: TaskStatusError::$actual, + expected: TaskStatusError::$expected, + }) + }; +} + pub type RpcTaskManagerShared = Arc>>; pub(crate) type RpcTaskManagerWeak = Weak>>; @@ -64,6 +74,7 @@ impl RpcTaskManager { None => { info!("RPC task '{}' has been aborted", task_id); task.cancel().await; + task_handle.on_cancelled(); }, } }; @@ -80,6 +91,9 @@ impl RpcTaskManager { let rpc_status = match entry.get() { TaskStatusExt::InProgress { status, .. } => RpcTaskStatus::InProgress(status.clone()), TaskStatusExt::Awaiting { status, .. } => RpcTaskStatus::UserActionRequired(status.clone()), + // Don't return an `RpcTaskStatus::Cancelled` status, + // instead return `None` as there is no such task for the user already. + TaskStatusExt::Cancelling { .. } => return None, TaskStatusExt::Ok(result) => RpcTaskStatus::Ok(result.clone()), TaskStatusExt::Error(error) => RpcTaskStatus::Error(error.clone()), }; @@ -95,22 +109,33 @@ impl RpcTaskManager { /// Cancel task if it's in progress. pub fn cancel_task(&mut self, task_id: TaskId) -> RpcTaskResult<()> { - let task_status = match self.tasks.entry(task_id) { - Entry::Occupied(task_status) => task_status, - Entry::Vacant(_) => return MmError::err(RpcTaskError::NoSuchTask(task_id)), - }; + let task = self.tasks.remove(&task_id); - if task_status.get().is_ready() { - return MmError::err(RpcTaskError::UnexpectedTaskStatus { - task_id, - actual: TaskStatusError::Finished, - expected: TaskStatusError::InProgress, - }); + match task { + Some(finished_task @ TaskStatusExt::Ok(_) | finished_task @ TaskStatusExt::Error(_)) => { + // Return the task result to the `tasks` container. + self.tasks.insert(task_id, finished_task); + unexpected_task_status!(task_id, actual = Finished, expected = InProgress) + }, + Some(TaskStatusExt::InProgress { .. }) => { + let new_task = TaskStatusExt::Cancelling { _action_sender: None }; + self.tasks.insert(task_id, new_task); + Ok(()) + }, + Some(TaskStatusExt::Awaiting { action_sender, .. }) => { + let new_task = TaskStatusExt::Cancelling { + _action_sender: Some(action_sender), + }; + self.tasks.insert(task_id, new_task); + Ok(()) + }, + Some(cancelling_task @ TaskStatusExt::Cancelling { .. }) => { + // Return the task result to the `tasks` container. + self.tasks.insert(task_id, cancelling_task); + unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress) + }, + None => MmError::err(RpcTaskError::NoSuchTask(task_id)), } - - // The task will be aborted when the `TaskAbortHandle` is dropped. - task_status.remove(); - Ok(()) } pub(crate) fn register_task( @@ -120,11 +145,7 @@ impl RpcTaskManager { let task_id = next_rpc_task_id(); let (abort_handle, abort_handler) = oneshot::channel(); match self.tasks.entry(task_id) { - Entry::Occupied(_entry) => MmError::err(RpcTaskError::UnexpectedTaskStatus { - task_id, - actual: TaskStatusError::InProgress, - expected: TaskStatusError::Idle, - }), + Entry::Occupied(_entry) => unexpected_task_status!(task_id, actual = InProgress, expected = Idle), Entry::Vacant(entry) => { entry.insert(TaskStatusExt::InProgress { status: task_initial_in_progress_status, @@ -147,6 +168,16 @@ impl RpcTaskManager { } } + pub(crate) fn on_task_cancelling_finished(&mut self, task_id: TaskId) -> RpcTaskResult<()> { + match self.tasks.remove(&task_id) { + Some(TaskStatusExt::Cancelling { .. }) => Ok(()), + _ => { + let error = format!("Cancelled task '{task_id}' was not in `Cancelling` status"); + MmError::err(RpcTaskError::Internal(error)) + }, + } + } + fn on_task_finished( &mut self, task_id: TaskId, @@ -157,7 +188,8 @@ impl RpcTaskManager { Err(error) => TaskStatusExt::Error(error), }; if self.tasks.insert(task_id, task_status).is_none() { - warn!("Finished task '{}' was not ongoing", task_id); + let error = format!("Finished task '{task_id}' was not ongoing"); + return MmError::err(RpcTaskError::Internal(error)); } Ok(()) } @@ -171,14 +203,15 @@ impl RpcTaskManager { .insert(task_id, TaskStatusExt::InProgress { status, abort_handle }); Ok(()) }, + Some(cancelling @ TaskStatusExt::Cancelling { .. }) => { + // Return the task result to the tasks container. + self.tasks.insert(task_id, cancelling); + unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress) + }, Some(ready @ TaskStatusExt::Ok(_) | ready @ TaskStatusExt::Error(_)) => { // Return the task result to the tasks container. self.tasks.insert(task_id, ready); - MmError::err(RpcTaskError::UnexpectedTaskStatus { - task_id, - actual: TaskStatusError::Finished, - expected: TaskStatusError::InProgress, - }) + unexpected_task_status!(task_id, actual = Finished, expected = InProgress) }, None => MmError::err(RpcTaskError::NoSuchTask(task_id)), } @@ -233,7 +266,7 @@ impl RpcTaskManager { let result = action_sender .send(user_action) // The task seems to be canceled/aborted for some reason. - .map_to_mm(|_user_action| RpcTaskError::Canceled); + .map_to_mm(|_user_action| RpcTaskError::Cancelled); // Insert new in-progress status to the tasks container. self.tasks .insert(task_id, TaskStatusExt::InProgress { status, abort_handle }); @@ -272,16 +305,21 @@ enum TaskStatusExt { abort_handle: TaskAbortHandle, next_in_progress_status: Task::InProgressStatus, }, + /// `Cancelling` status is set on [`RpcTaskManager::cancel_task`]. + /// This status is used to save the task state before it's actually canceled on [`RpcTaskHandle::on_canceled`], + /// in particular, to fix https://github.com/KomodoPlatform/atomicDEX-API/issues/1580. + Cancelling { + _action_sender: Option>, + }, } impl TaskStatusExt { - fn is_ready(&self) -> bool { matches!(self, TaskStatusExt::Ok(_) | TaskStatusExt::Error(_)) } - fn task_status_err(&self) -> TaskStatusError { match self { TaskStatusExt::Ok(_) | TaskStatusExt::Error(_) => TaskStatusError::Finished, TaskStatusExt::InProgress { .. } => TaskStatusError::InProgress, TaskStatusExt::Awaiting { .. } => TaskStatusError::AwaitingUserAction, + TaskStatusExt::Cancelling { .. } => TaskStatusError::Cancelled, } } }