Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Fix task::*::cancel if the RPC task is an awaiting status #1582

Merged
merged 1 commit into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mm2src/coins/rpc_command/hd_account_balance_rpc_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl From<AddressDerivingError> for HDAccountBalanceRpcError {
impl From<RpcTaskError> for HDAccountBalanceRpcError {
fn from(e: RpcTaskError) -> Self {
match e {
RpcTaskError::Canceled => HDAccountBalanceRpcError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => HDAccountBalanceRpcError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => HDAccountBalanceRpcError::Timeout(timeout),
RpcTaskError::NoSuchTask(_)
// `UnexpectedTaskStatus` and `UnexpectedUserAction` are not expected at the balance request.
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/rpc_command/init_create_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl From<RpcTaskError> for CreateAccountRpcError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => CreateAccountRpcError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => CreateAccountRpcError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => CreateAccountRpcError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
CreateAccountRpcError::Internal(error)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/utxo/utxo_withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<RpcTaskError> for WithdrawError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => WithdrawError::InternalError("Canceled".to_owned()),
RpcTaskError::Cancelled => WithdrawError::InternalError("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => WithdrawError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
WithdrawError::InternalError(error)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_init/init_hw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl From<RpcTaskError> for InitHwError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => InitHwError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => InitHwError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => InitHwError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => InitHwError::Internal(error),
RpcTaskError::UnexpectedUserAction { expected } => InitHwError::UnexpectedUserAction { expected },
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/src/lp_init/init_metamask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl From<RpcTaskError> for InitMetamaskError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => InitMetamaskError::Internal("Canceled".to_owned()),
RpcTaskError::Cancelled => InitMetamaskError::Internal("Cancelled".to_owned()),
RpcTaskError::Timeout(timeout) => InitMetamaskError::Timeout(timeout),
RpcTaskError::NoSuchTask(_) | RpcTaskError::UnexpectedTaskStatus { .. } => {
InitMetamaskError::Internal(error)
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl From<AdexBehaviourError> for P2PInitError {
#[derive(Clone, Debug, Display, EnumFromTrait, Serialize, SerializeErrorType)]
#[serde(tag = "error_type", content = "error_data")]
pub enum MmInitError {
Canceled,
Cancelled,
#[from_trait(WithTimeout::timeout)]
#[display(fmt = "Initialization timeout {:?}", _0)]
Timeout(Duration),
Expand Down Expand Up @@ -224,7 +224,7 @@ impl From<RpcTaskError> for MmInitError {
fn from(e: RpcTaskError) -> Self {
let error = e.to_string();
match e {
RpcTaskError::Canceled => MmInitError::Canceled,
RpcTaskError::Cancelled => MmInitError::Cancelled,
RpcTaskError::Timeout(timeout) => MmInitError::Timeout(timeout),
RpcTaskError::NoSuchTask(_)
| RpcTaskError::UnexpectedTaskStatus { .. }
Expand Down
7 changes: 6 additions & 1 deletion mm2src/rpc_task/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<Task: RpcTask> RpcTaskHandle<Task> {
user_action_rx
.timeout(timeout)
.await?
.map_to_mm(|_canceled| RpcTaskError::Canceled)
.map_to_mm(|_canceled| RpcTaskError::Cancelled)
}

pub(crate) fn finish(self, result: Result<Task::Item, MmError<Task::Error>>) {
Expand All @@ -62,6 +62,11 @@ impl<Task: RpcTask> RpcTaskHandle<Task> {
.warn_log();
}

pub(crate) fn on_cancelled(self) {
self.lock_and_then(|mut task_manager| task_manager.on_task_cancelling_finished(self.task_id))
.warn_log();
}

fn prepare_task_result(result: Result<Task::Item, MmError<Task::Error>>) -> TaskStatus<Task> {
match result {
Ok(task_item) => TaskStatus::Ok(task_item),
Expand Down
3 changes: 2 additions & 1 deletion mm2src/rpc_task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub enum RpcTaskError {
UnexpectedUserAction {
expected: String,
},
Canceled,
Cancelled,
Internal(String),
}

Expand All @@ -61,6 +61,7 @@ pub enum TaskStatusError {
Idle,
InProgress,
AwaitingUserAction,
Cancelled,
Finished,
}

Expand Down
96 changes: 67 additions & 29 deletions mm2src/rpc_task/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::task::RpcTaskTypes;
use crate::{AtomicTaskId, RpcTask, RpcTaskError, RpcTaskHandle, RpcTaskResult, RpcTaskStatus, RpcTaskStatusAlias,
TaskAbortHandle, TaskAbortHandler, TaskId, TaskStatus, TaskStatusError, UserActionSender};
use common::executor::SpawnFuture;
use common::log::{debug, info, warn};
use common::log::{debug, info};
use futures::channel::oneshot;
use futures::future::{select, Either};
use mm2_err_handle::prelude::*;
Expand All @@ -11,6 +11,16 @@ use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, Weak};

macro_rules! unexpected_task_status {
($task_id:expr, actual = $actual:ident, expected = $expected:ident) => {
MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id: $task_id,
actual: TaskStatusError::$actual,
expected: TaskStatusError::$expected,
})
};
}

pub type RpcTaskManagerShared<Task> = Arc<Mutex<RpcTaskManager<Task>>>;
pub(crate) type RpcTaskManagerWeak<Task> = Weak<Mutex<RpcTaskManager<Task>>>;

Expand Down Expand Up @@ -64,6 +74,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
None => {
info!("RPC task '{}' has been aborted", task_id);
task.cancel().await;
task_handle.on_cancelled();
},
}
};
Expand All @@ -80,6 +91,9 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let rpc_status = match entry.get() {
TaskStatusExt::InProgress { status, .. } => RpcTaskStatus::InProgress(status.clone()),
TaskStatusExt::Awaiting { status, .. } => RpcTaskStatus::UserActionRequired(status.clone()),
// Don't return an `RpcTaskStatus::Cancelled` status,
// instead return `None` as there is no such task for the user already.
TaskStatusExt::Cancelling { .. } => return None,
TaskStatusExt::Ok(result) => RpcTaskStatus::Ok(result.clone()),
TaskStatusExt::Error(error) => RpcTaskStatus::Error(error.clone()),
};
Expand All @@ -95,22 +109,33 @@ impl<Task: RpcTask> RpcTaskManager<Task> {

/// Cancel task if it's in progress.
pub fn cancel_task(&mut self, task_id: TaskId) -> RpcTaskResult<()> {
let task_status = match self.tasks.entry(task_id) {
Entry::Occupied(task_status) => task_status,
Entry::Vacant(_) => return MmError::err(RpcTaskError::NoSuchTask(task_id)),
};
let task = self.tasks.remove(&task_id);

if task_status.get().is_ready() {
return MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::Finished,
expected: TaskStatusError::InProgress,
});
match task {
Some(finished_task @ TaskStatusExt::Ok(_) | finished_task @ TaskStatusExt::Error(_)) => {
// Return the task result to the `tasks` container.
self.tasks.insert(task_id, finished_task);
unexpected_task_status!(task_id, actual = Finished, expected = InProgress)
},
Some(TaskStatusExt::InProgress { .. }) => {
let new_task = TaskStatusExt::Cancelling { _action_sender: None };
self.tasks.insert(task_id, new_task);
Ok(())
},
Some(TaskStatusExt::Awaiting { action_sender, .. }) => {
let new_task = TaskStatusExt::Cancelling {
_action_sender: Some(action_sender),
};
self.tasks.insert(task_id, new_task);
Ok(())
},
Some(cancelling_task @ TaskStatusExt::Cancelling { .. }) => {
// Return the task result to the `tasks` container.
self.tasks.insert(task_id, cancelling_task);
unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress)
},
None => MmError::err(RpcTaskError::NoSuchTask(task_id)),
}

// The task will be aborted when the `TaskAbortHandle` is dropped.
task_status.remove();
Ok(())
}

pub(crate) fn register_task(
Expand All @@ -120,11 +145,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let task_id = next_rpc_task_id();
let (abort_handle, abort_handler) = oneshot::channel();
match self.tasks.entry(task_id) {
Entry::Occupied(_entry) => MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::InProgress,
expected: TaskStatusError::Idle,
}),
Entry::Occupied(_entry) => unexpected_task_status!(task_id, actual = InProgress, expected = Idle),
Entry::Vacant(entry) => {
entry.insert(TaskStatusExt::InProgress {
status: task_initial_in_progress_status,
Expand All @@ -147,6 +168,16 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
}
}

pub(crate) fn on_task_cancelling_finished(&mut self, task_id: TaskId) -> RpcTaskResult<()> {
match self.tasks.remove(&task_id) {
Some(TaskStatusExt::Cancelling { .. }) => Ok(()),
_ => {
let error = format!("Cancelled task '{task_id}' was not in `Cancelling` status");
MmError::err(RpcTaskError::Internal(error))
},
}
}

fn on_task_finished(
&mut self,
task_id: TaskId,
Expand All @@ -157,7 +188,8 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
Err(error) => TaskStatusExt::Error(error),
};
if self.tasks.insert(task_id, task_status).is_none() {
warn!("Finished task '{}' was not ongoing", task_id);
let error = format!("Finished task '{task_id}' was not ongoing");
return MmError::err(RpcTaskError::Internal(error));
}
Ok(())
}
Expand All @@ -171,14 +203,15 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
.insert(task_id, TaskStatusExt::InProgress { status, abort_handle });
Ok(())
},
Some(cancelling @ TaskStatusExt::Cancelling { .. }) => {
// Return the task result to the tasks container.
self.tasks.insert(task_id, cancelling);
unexpected_task_status!(task_id, actual = Cancelled, expected = InProgress)
},
Some(ready @ TaskStatusExt::Ok(_) | ready @ TaskStatusExt::Error(_)) => {
// Return the task result to the tasks container.
self.tasks.insert(task_id, ready);
MmError::err(RpcTaskError::UnexpectedTaskStatus {
task_id,
actual: TaskStatusError::Finished,
expected: TaskStatusError::InProgress,
})
unexpected_task_status!(task_id, actual = Finished, expected = InProgress)
},
None => MmError::err(RpcTaskError::NoSuchTask(task_id)),
}
Expand Down Expand Up @@ -233,7 +266,7 @@ impl<Task: RpcTask> RpcTaskManager<Task> {
let result = action_sender
.send(user_action)
// The task seems to be canceled/aborted for some reason.
.map_to_mm(|_user_action| RpcTaskError::Canceled);
.map_to_mm(|_user_action| RpcTaskError::Cancelled);
// Insert new in-progress status to the tasks container.
self.tasks
.insert(task_id, TaskStatusExt::InProgress { status, abort_handle });
Expand Down Expand Up @@ -272,16 +305,21 @@ enum TaskStatusExt<Task: RpcTaskTypes> {
abort_handle: TaskAbortHandle,
next_in_progress_status: Task::InProgressStatus,
},
/// `Cancelling` status is set on [`RpcTaskManager::cancel_task`].
/// This status is used to save the task state before it's actually canceled on [`RpcTaskHandle::on_canceled`],
/// in particular, to fix https://github.com/KomodoPlatform/atomicDEX-API/issues/1580.
Cancelling {
_action_sender: Option<UserActionSender<Task::UserAction>>,
},
}

impl<Task: RpcTaskTypes> TaskStatusExt<Task> {
fn is_ready(&self) -> bool { matches!(self, TaskStatusExt::Ok(_) | TaskStatusExt::Error(_)) }

fn task_status_err(&self) -> TaskStatusError {
match self {
TaskStatusExt::Ok(_) | TaskStatusExt::Error(_) => TaskStatusError::Finished,
TaskStatusExt::InProgress { .. } => TaskStatusError::InProgress,
TaskStatusExt::Awaiting { .. } => TaskStatusError::AwaitingUserAction,
TaskStatusExt::Cancelling { .. } => TaskStatusError::Cancelled,
}
}
}