Skip to content

Commit

Permalink
Fix task::*::cancel if the RPC task is an awaiting status (#1582)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyboyko0791 authored Dec 26, 2022
1 parent e5827f5 commit 4f1190b
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 38 deletions.
2 changes: 1 addition & 1 deletion mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl From<AddressDerivingError> for HDAccountBalanceRpcError {
impl From<RpcTaskError> for HDAccountBalanceRpcError {
fn from(e: RpcTaskError) -> Self {
match e {
RpcTaskError::Canceled => HDAccountBalanceRpcError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => HDAccountBalanceRpcError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => HDAccountBalanceRpcError::Timeout(timeout),
RpcTaskError::NoSuchTask(_)
// `UnexpectedTaskStatus` and `UnexpectedUserAction` are not expected at the balance request.
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/rpc_command/init_create_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl From<RpcTaskError> for CreateAccountRpcError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => CreateAccountRpcError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => CreateAccountRpcError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => CreateAccountRpcError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
CreateAccountRpcError::Internal(error)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/utxo_withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<RpcTaskError> for WithdrawError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => WithdrawError::InternalError("Canceled".to_owned()),
RpcTaskError::Cancelled => WithdrawError::InternalError("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => WithdrawError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
WithdrawError::InternalError(error)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_init/init_hw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl From<RpcTaskError> for InitHwError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => InitHwError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => InitHwError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => InitHwError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => InitHwError::Internal(error),
RpcTaskError::UnexpectedUserAction { expected } => InitHwError::UnexpectedUserAction { expected },
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_init/init_metamask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl From<RpcTaskError> for InitMetamaskError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => InitMetamaskError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => InitMetamaskError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => InitMetamaskError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
InitMetamaskError::Internal(error)
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl From<AdexBehaviourError> for P2PInitError {
#[derive(Clone, Debug, Display, EnumFromTrait, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum MmInitError {
Canceled,
Cancelled,
#[from_trait(WithTimeout::timeout)]
#[display(fmt = "Initialization timeout {:?}", _0)]
Timeout(Duration),
Expand Down Expand Up @@ -224,7 +224,7 @@ impl From<RpcTaskError> for MmInitError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => MmInitError::Canceled,
RpcTaskError::Cancelled => MmInitError::Cancelled,
RpcTaskError::Timeout(timeout) => MmInitError::Timeout(timeout),
RpcTaskError::NoSuchTask(_)
| RpcTaskError::UnexpectedTaskStatus { .. }
Expand Down
7 changes: 6 additions & 1 deletion mm2src/rpc_task/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<Task: RpcTask> RpcTaskHandle<Task> {
user_action_rx
.timeout(timeout)
.await?
.map_to_mm(|_canceled| RpcTaskError::Canceled)
.map_to_mm(|_canceled| RpcTaskError::Cancelled)
}

pub(crate) fn finish(self, result: Result<Task::Item, MmError<Task::Error>>) {
Expand All @@ -62,6 +62,11 @@ impl<Task: RpcTask> RpcTaskHandle<Task> {
.warn_log();
}

pub(crate) fn on_cancelled(self) {
self.lock_and_then(|mut task_manager| task_manager.on_task_cancelling_finished(self.task_id))
.warn_log();
}

fn prepare_task_result(result: Result<Task::Item, MmError<Task::Error>>) -> TaskStatus<Task> {
match result {
Ok(task_item) => TaskStatus::Ok(task_item),
Expand Down
3 changes: 2 additions & 1 deletion mm2src/rpc_task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum RpcTaskError {
UnexpectedUserAction {
expected: String,
},
Canceled,
Cancelled,
Internal(String),
}

Expand All @@ -61,6 +61,7 @@ pub enum TaskStatusError {
Idle,
InProgress,
AwaitingUserAction,
Cancelled,
Finished,
}

Expand Down
96 changes: 67 additions & 29 deletions mm2src/rpc_task/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::task::RpcTaskTypes;
use crate::{AtomicTaskId, RpcTask, RpcTaskError, RpcTaskHandle, RpcTaskResult, RpcTaskStatus, RpcTaskStatusAlias,
TaskAbortHandle, TaskAbortHandler, TaskId, TaskStatus, TaskStatusError, UserActionSender};
use common::executor::SpawnFuture;
use common::log::{debug, info, warn};
use common::log::{debug, info};
use futures::channel::oneshot;
use futures::future::{select, Either};
use mm2_err_handle::prelude::*;
Expand All @@ -11,6 +11,16 @@ use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, Weak};

macro_rules! unexpected_task_status {
($task_id:expr, actual = $actual:ident, expected = $expected:ident) => {
MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id: $task_id,
actual: TaskStatusError::$actual,
expected: TaskStatusError::$expected,
})
};
}

pub type RpcTaskManagerShared<Task> = Arc<Mutex<RpcTaskManager<Task>>>;
pub(crate) type RpcTaskManagerWeak<Task> = Weak<Mutex<RpcTaskManager<Task>>>;

Expand Down Expand Up @@ -64,6 +74,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
None => {
info!("RPC task '{}' has been aborted", task_id);
task.cancel().await;
task_handle.on_cancelled();
},
}
};
Expand All @@ -80,6 +91,9 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let rpc_status = match entry.get() {
TaskStatusExt::InProgress { status, .. } => RpcTaskStatus::InProgress(status.clone()),
TaskStatusExt::Awaiting { status, .. } => RpcTaskStatus::UserActionRequired(status.clone()),
// Don't return an `RpcTaskStatus::Cancelled` status,
// instead return `None` as there is no such task for the user already.
TaskStatusExt::Cancelling { .. } => return None,
TaskStatusExt::Ok(result) => RpcTaskStatus::Ok(result.clone()),
TaskStatusExt::Error(error) => RpcTaskStatus::Error(error.clone()),
};
Expand All @@ -95,22 +109,33 @@ impl<Task: RpcTask> RpcTaskManager<Task> {

/// Cancel task if it's in progress.
pub fn cancel_task(&mut self, task_id: TaskId) -> RpcTaskResult<()> {
let task_status = match self.tasks.entry(task_id) {
Entry::Occupied(task_status) => task_status,
Entry::Vacant(_) => return MmError::err(RpcTaskError::NoSuchTask(task_id)),
};
let task = self.tasks.remove(&task_id);

if task_status.get().is_ready() {
return MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::Finished,
expected: TaskStatusError::InProgress,
});
match task {
Some(finished_task @ TaskStatusExt::Ok(_) | finished_task @ TaskStatusExt::Error(_)) => {
// Return the task result to the `tasks` container.
self.tasks.insert(task_id, finished_task);
unexpected_task_status!(task_id, actual = Finished, expected = InProgress)
},
Some(TaskStatusExt::InProgress { .. }) => {
let new_task = TaskStatusExt::Cancelling { _action_sender: None };
self.tasks.insert(task_id, new_task);
Ok(())
},
Some(TaskStatusExt::Awaiting { action_sender, .. }) => {
let new_task = TaskStatusExt::Cancelling {
_action_sender: Some(action_sender),
};
self.tasks.insert(task_id, new_task);
Ok(())
},
Some(cancelling_task @ TaskStatusExt::Cancelling { .. }) => {
// Return the task result to the `tasks` container.
self.tasks.insert(task_id, cancelling_task);
unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress)
},
None => MmError::err(RpcTaskError::NoSuchTask(task_id)),
}

// The task will be aborted when the `TaskAbortHandle` is dropped.
task_status.remove();
Ok(())
}

pub(crate) fn register_task(
Expand All @@ -120,11 +145,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let task_id = next_rpc_task_id();
let (abort_handle, abort_handler) = oneshot::channel();
match self.tasks.entry(task_id) {
Entry::Occupied(_entry) => MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::InProgress,
expected: TaskStatusError::Idle,
}),
Entry::Occupied(_entry) => unexpected_task_status!(task_id, actual = InProgress, expected = Idle),
Entry::Vacant(entry) => {
entry.insert(TaskStatusExt::InProgress {
status: task_initial_in_progress_status,
Expand All @@ -147,6 +168,16 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
}
}

pub(crate) fn on_task_cancelling_finished(&mut self, task_id: TaskId) -> RpcTaskResult<()> {
match self.tasks.remove(&task_id) {
Some(TaskStatusExt::Cancelling { .. }) => Ok(()),
_ => {
let error = format!("Cancelled task '{task_id}' was not in `Cancelling` status");
MmError::err(RpcTaskError::Internal(error))
},
}
}

fn on_task_finished(
&mut self,
task_id: TaskId,
Expand All @@ -157,7 +188,8 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
Err(error) => TaskStatusExt::Error(error),
};
if self.tasks.insert(task_id, task_status).is_none() {
warn!("Finished task '{}' was not ongoing", task_id);
let error = format!("Finished task '{task_id}' was not ongoing");
return MmError::err(RpcTaskError::Internal(error));
}
Ok(())
}
Expand All @@ -171,14 +203,15 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
.insert(task_id, TaskStatusExt::InProgress { status, abort_handle });
Ok(())
},
Some(cancelling @ TaskStatusExt::Cancelling { .. }) => {
// Return the task result to the tasks container.
self.tasks.insert(task_id, cancelling);
unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress)
},
Some(ready @ TaskStatusExt::Ok(_) | ready @ TaskStatusExt::Error(_)) => {
// Return the task result to the tasks container.
self.tasks.insert(task_id, ready);
MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::Finished,
expected: TaskStatusError::InProgress,
})
unexpected_task_status!(task_id, actual = Finished, expected = InProgress)
},
None => MmError::err(RpcTaskError::NoSuchTask(task_id)),
}
Expand Down Expand Up @@ -233,7 +266,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let result = action_sender
.send(user_action)
// The task seems to be canceled/aborted for some reason.
.map_to_mm(|_user_action| RpcTaskError::Canceled);
.map_to_mm(|_user_action| RpcTaskError::Cancelled);
// Insert new in-progress status to the tasks container.
self.tasks
.insert(task_id, TaskStatusExt::InProgress { status, abort_handle });
Expand Down Expand Up @@ -272,16 +305,21 @@ enum TaskStatusExt<Task: RpcTaskTypes> {
abort_handle: TaskAbortHandle,
next_in_progress_status: Task::InProgressStatus,
},
/// `Cancelling` status is set on [`RpcTaskManager::cancel_task`].
/// This status is used to save the task state before it's actually canceled on [`RpcTaskHandle::on_canceled`],
/// in particular, to fix https://github.com/KomodoPlatform/atomicDEX-API/issues/1580.
Cancelling {
_action_sender: Option<UserActionSender<Task::UserAction>>,
},
}

impl<Task: RpcTaskTypes> TaskStatusExt<Task> {
fn is_ready(&self) -> bool { matches!(self, TaskStatusExt::Ok(_) | TaskStatusExt::Error(_)) }

fn task_status_err(&self) -> TaskStatusError {
match self {
TaskStatusExt::Ok(_) | TaskStatusExt::Error(_) => TaskStatusError::Finished,
TaskStatusExt::InProgress { .. } => TaskStatusError::InProgress,
TaskStatusExt::Awaiting { .. } => TaskStatusError::AwaitingUserAction,
TaskStatusExt::Cancelling { .. } => TaskStatusError::Cancelled,
}
}
}

0 comments on commit 4f1190b

Please sign in to comment.