Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Stop spawned futures #1490

Merged
merged 32 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
53e3de7
Wrap lightning futures into `AbortOnDropHandle`
sergeyboyko0791 Sep 26, 2022
43df80b
Refactor `common::executor` by moving to a separate module
sergeyboyko0791 Sep 27, 2022
cc04c31
Add `AbortableSpawner` that has `spawn` and `spawn_critical`
sergeyboyko0791 Sep 28, 2022
132808c
Spawn TX history with `CoinFutureSpawner::spawn`
sergeyboyko0791 Sep 29, 2022
414fa1d
Spawn electrum client with `CoinFutureSpawner`
sergeyboyko0791 Sep 30, 2022
a2b9306
Merge branch 'dev' into stop-spawned-futures
sergeyboyko0791 Sep 30, 2022
09ffab5
Use `CoinFutureSpawner` within `UtxoArcBuilder`
sergeyboyko0791 Sep 30, 2022
3511dc2
Refactor `AbortableSpawner`
sergeyboyko0791 Oct 2, 2022
95ec297
Optimize `SpawnedFutured` by replacing HashMap with Vec
sergeyboyko0791 Oct 2, 2022
5d74e9b
Continue integrating `MmSpawner`
sergeyboyko0791 Oct 2, 2022
d0de4ce
Refactor `ws_transport` to avoid unnecessary `spawn` calls
sergeyboyko0791 Oct 2, 2022
595060b
Integrate `FutureSpawner` into RPC task manager
sergeyboyko0791 Oct 2, 2022
c2a3b2a
Make `spawn` and `spawn_local` unsafe
sergeyboyko0791 Oct 3, 2022
7771e79
Make sure `AbortableSpawner` can't be cloned
sergeyboyko0791 Oct 4, 2022
155e972
Remove `SWARM_RUNTIME` from `mm2_libp2p`
sergeyboyko0791 Oct 4, 2022
cf5276f
Don't use `MmArc::on_stop`
sergeyboyko0791 Oct 4, 2022
1dabf4f
Don't use `MmArc::on_stop` when `spawn_gossipsub`
sergeyboyko0791 Oct 4, 2022
4fa1570
Refactor `AbortableSpawner` to `AbortableSystem`
sergeyboyko0791 Oct 6, 2022
17554a1
Spawn `check_balance_update_loop` future via `AbortableSimpleMap` sub…
sergeyboyko0791 Oct 6, 2022
2ff04ca
Ignore unstable tests
sergeyboyko0791 Oct 6, 2022
62b6613
Add `test_mm2_stops_immediately` WASM test
sergeyboyko0791 Oct 7, 2022
42f08f9
Fix `test_mm2_stops_immediately` by removing `QRC20` coin from test
sergeyboyko0791 Oct 7, 2022
9c06867
Minor changes and fixes
sergeyboyko0791 Oct 7, 2022
873d9a6
Merge branch 'dev' into stop-spawned-futures
sergeyboyko0791 Oct 11, 2022
4531dc3
Refactor `ElectrumClient` not to use `spawn` function
sergeyboyko0791 Oct 12, 2022
ce8859d
Make `spawn` and `spawn_local` safe
sergeyboyko0791 Oct 12, 2022
be674da
Minor changes
sergeyboyko0791 Oct 13, 2022
186e00a
Add `GracefulShutdownRegistry` and use it for hyper server
sergeyboyko0791 Oct 13, 2022
a199637
Fix BCH, SLP tests
sergeyboyko0791 Oct 13, 2022
522edd9
Rename `QueueInner::remove_finished` to `on_finished`
sergeyboyko0791 Oct 13, 2022
de59cb6
Remove `MmCtx::stop_listeners`, `MmCtx::on_stop`
sergeyboyko0791 Oct 13, 2022
c904c63
Spawn `save_channel_closing_details` with a critical timeout
sergeyboyko0791 Oct 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 21 additions & 11 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//
use async_trait::async_trait;
use bitcrypto::{keccak256, sha256};
use common::executor::Timer;
use common::executor::{abortable_queue::AbortableQueue, AbortableSystem, Timer};
use common::log::{error, info, warn};
use common::{get_utc_timestamp, now_ms, small_rng, DEX_FEE_ADDR_RAW_PUBKEY};
use crypto::privkey::key_pair_from_secret;
Expand Down Expand Up @@ -59,16 +59,16 @@ use web3::types::{Action as TraceAction, BlockId, BlockNumber, Bytes, CallReques
use web3::{self, Web3};
use web3_transport::{EthFeeHistoryNamespace, Web3Transport, Web3TransportNode};

use super::{coin_conf, AsyncMutex, BalanceError, BalanceFut, CoinBalance, CoinProtocol, CoinTransportMetrics,
CoinsContext, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin, MyAddressError,
NegotiateSwapContractAddrErr, NumConversError, NumConversResult, RawTransactionError, RawTransactionFut,
RawTransactionRequest, RawTransactionRes, RawTransactionResult, RpcClientType, RpcTransportEventHandler,
RpcTransportEventHandlerShared, SearchForSwapTxSpendInput, SignatureError, SignatureResult, SwapOps,
TradeFee, TradePreimageError, TradePreimageFut, TradePreimageResult, TradePreimageValue, Transaction,
TransactionDetails, TransactionEnum, TransactionErr, TransactionFut, TxMarshalingErr,
UnexpectedDerivationMethod, ValidateAddressResult, ValidatePaymentError, ValidatePaymentFut,
ValidatePaymentInput, VerificationError, VerificationResult, WatcherValidatePaymentInput, WithdrawError,
WithdrawFee, WithdrawFut, WithdrawRequest, WithdrawResult};
use super::{coin_conf, AsyncMutex, BalanceError, BalanceFut, CoinBalance, CoinFutSpawner, CoinProtocol,
CoinTransportMetrics, CoinsContext, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps,
MmCoin, MyAddressError, NegotiateSwapContractAddrErr, NumConversError, NumConversResult,
RawTransactionError, RawTransactionFut, RawTransactionRequest, RawTransactionRes, RawTransactionResult,
RpcClientType, RpcTransportEventHandler, RpcTransportEventHandlerShared, SearchForSwapTxSpendInput,
SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageError, TradePreimageFut,
TradePreimageResult, TradePreimageValue, Transaction, TransactionDetails, TransactionEnum, TransactionErr,
TransactionFut, TxMarshalingErr, UnexpectedDerivationMethod, ValidateAddressResult, ValidatePaymentError,
ValidatePaymentFut, ValidatePaymentInput, VerificationError, VerificationResult,
WatcherValidatePaymentInput, WithdrawError, WithdrawFee, WithdrawFut, WithdrawRequest, WithdrawResult};

pub use rlp;

Expand Down Expand Up @@ -314,6 +314,9 @@ pub struct EthCoinImpl {
logs_block_range: u64,
nonce_lock: Arc<AsyncMutex<()>>,
erc20_tokens_infos: Arc<Mutex<HashMap<String, Erc20TokenInfo>>>,
/// This spawner is used to spawn coin's related futures that should be aborted on coin deactivation
/// and on [`MmArc::stop`].
abortable_system: AbortableQueue,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -3076,6 +3079,8 @@ impl EthTxFeeDetails {
impl MmCoin for EthCoin {
fn is_asset_chain(&self) -> bool { false }

fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.abortable_system) }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Box::new(get_raw_transaction_impl(self.clone(), req).boxed().compat())
}
Expand Down Expand Up @@ -3631,6 +3636,10 @@ pub async fn eth_coin_from_conf_and_request(

let nonce_lock = map.entry(key_lock).or_insert_with(new_nonce_lock).clone();

// Create an abortable system linked to the `MmCtx` so if the context is stopped via `MmArc::stop`,
// all spawned futures related to `ETH` coin will be aborted as well.
let abortable_system = ctx.abortable_system.create_subsystem();

let coin = EthCoinImpl {
key_pair,
my_address,
Expand All @@ -3652,6 +3661,7 @@ pub async fn eth_coin_from_conf_and_request(
logs_block_range: conf["logs_block_range"].as_u64().unwrap_or(DEFAULT_LOGS_BLOCK_RANGE),
nonce_lock,
erc20_tokens_infos: Default::default(),
abortable_system,
};
Ok(EthCoin(Arc::new(coin)))
}
Expand Down
13 changes: 13 additions & 0 deletions mm2src/coins/eth/eth_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ fn eth_coin_for_test(
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));
(ctx, eth_coin)
}
Expand Down Expand Up @@ -244,6 +245,7 @@ fn send_and_refund_erc20_payment() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

let payment = coin
Expand Down Expand Up @@ -312,6 +314,7 @@ fn send_and_refund_eth_payment() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

let payment = coin
Expand Down Expand Up @@ -397,6 +400,7 @@ fn test_nonce_several_urls() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

log!("My address {:?}", coin.my_address);
Expand Down Expand Up @@ -447,6 +451,7 @@ fn test_wait_for_payment_spend_timeout() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
};

let coin = EthCoin(Arc::new(coin));
Expand All @@ -473,6 +478,8 @@ fn test_wait_for_payment_spend_timeout() {
}

#[test]
// TODO unignore
#[ignore]
fn test_search_for_swap_tx_spend_was_spent() {
let key_pair = KeyPair::from_secret_slice(
&hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(),
Expand Down Expand Up @@ -507,6 +514,7 @@ fn test_search_for_swap_tx_spend_was_spent() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

// raw transaction bytes of https://ropsten.etherscan.io/tx/0xb1c987e2ac79581bb8718267b5cb49a18274890494299239d1d0dfdb58d6d76a
Expand Down Expand Up @@ -577,6 +585,8 @@ fn test_gas_station() {
}

#[test]
// TODO unignore
#[ignore]
fn test_search_for_swap_tx_spend_was_refunded() {
let key_pair = KeyPair::from_secret_slice(
&hex::decode("809465b17d0a4ddb3e4c69e8f23c2cabad868f51f8bed5c765ad1d6516c3306f").unwrap(),
Expand Down Expand Up @@ -614,6 +624,7 @@ fn test_search_for_swap_tx_spend_was_refunded() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

// raw transaction bytes of https://ropsten.etherscan.io/tx/0xe18bbca69dea9a4624e1f5b0b2021d5fe4c8daa03f36084a8ba011b08e5cd938
Expand Down Expand Up @@ -1288,6 +1299,7 @@ fn test_message_hash() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

let message_hash = coin.sign_message_hash("test").unwrap();
Expand Down Expand Up @@ -1331,6 +1343,7 @@ fn test_sign_verify_message() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));

let message = "test";
Expand Down
1 change: 1 addition & 0 deletions mm2src/coins/eth/eth_wasm_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async fn test_send() {
logs_block_range: DEFAULT_LOGS_BLOCK_RANGE,
nonce_lock: new_nonce_lock(),
erc20_tokens_infos: Default::default(),
abortable_system: AbortableQueue::default(),
}));
let tx = coin
.send_maker_payment(
Expand Down
10 changes: 10 additions & 0 deletions mm2src/coins/eth/v2_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl EthCoin {
.unwrap_or_else(|| conf["required_confirmations"].as_u64().unwrap_or(1))
.into();

// Create an abortable system linked to the `MmCtx` so if the app is stopped on `MmArc::stop`,
// all spawned futures related to `ERC20` coin will be aborted as well.
let abortable_system = ctx.abortable_system.create_subsystem();

let token = EthCoinImpl {
key_pair: self.key_pair.clone(),
my_address: self.my_address,
Expand All @@ -115,6 +119,7 @@ impl EthCoin {
logs_block_range: self.logs_block_range,
nonce_lock: self.nonce_lock.clone(),
erc20_tokens_infos: Default::default(),
abortable_system,
};

Ok(EthCoin(Arc::new(token)))
Expand Down Expand Up @@ -227,6 +232,10 @@ pub async fn eth_coin_from_conf_and_request_v2(
let mut map = NONCE_LOCK.lock().unwrap();
let nonce_lock = map.entry(ticker.to_string()).or_insert_with(new_nonce_lock).clone();

// Create an abortable system linked to the `MmCtx` so if the app is stopped on `MmArc::stop`,
// all spawned futures related to `ETH` coin will be aborted as well.
let abortable_system = ctx.abortable_system.create_subsystem();

let coin = EthCoinImpl {
key_pair: key_pair.clone(),
my_address: key_pair.address(),
Expand All @@ -248,6 +257,7 @@ pub async fn eth_coin_from_conf_and_request_v2(
logs_block_range: conf["logs_block_range"].as_u64().unwrap_or(DEFAULT_LOGS_BLOCK_RANGE),
nonce_lock,
erc20_tokens_infos: Default::default(),
abortable_system,
};

Ok(EthCoin(Arc::new(coin)))
Expand Down
28 changes: 16 additions & 12 deletions mm2src/coins/lightning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@ use super::{lp_coinfind_or_err, DerivationMethod, MmCoinEnum};
use crate::coin_errors::{MyAddressError, ValidatePaymentError};
use crate::lightning::ln_conf::OurChannelsConfigs;
use crate::lightning::ln_errors::{TrustedNodeError, TrustedNodeResult, UpdateChannelError, UpdateChannelResult};
use crate::lightning::ln_events::init_events_abort_handlers;
use crate::lightning::ln_events::init_abortable_events;
use crate::lightning::ln_serialization::PublicKeyForRPC;
use crate::lightning::ln_sql::SqliteLightningDB;
use crate::lightning::ln_storage::{NetworkGraph, TrustedNodesShared};
use crate::utxo::rpc_clients::UtxoRpcClientEnum;
use crate::utxo::utxo_common::{big_decimal_from_sat_unsigned, UtxoTxBuilder};
use crate::utxo::{sat_from_big_decimal, BlockchainNetwork, FeePolicy, GetUtxoListOps, UtxoTxGenerationOps};
use crate::{BalanceFut, CoinBalance, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin,
NegotiateSwapContractAddrErr, RawTransactionFut, RawTransactionRequest, SearchForSwapTxSpendInput,
SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageFut, TradePreimageResult,
TradePreimageValue, TransactionEnum, TransactionFut, TxMarshalingErr, UnexpectedDerivationMethod,
UtxoStandardCoin, ValidateAddressResult, ValidatePaymentFut, ValidatePaymentInput, VerificationError,
VerificationResult, WatcherValidatePaymentInput, WithdrawError, WithdrawFut, WithdrawRequest};
use crate::{BalanceFut, CoinBalance, CoinFutSpawner, FeeApproxStage, FoundSwapTxSpend, HistorySyncState,
MarketCoinOps, MmCoin, NegotiateSwapContractAddrErr, RawTransactionFut, RawTransactionRequest,
SearchForSwapTxSpendInput, SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageFut,
TradePreimageResult, TradePreimageValue, TransactionEnum, TransactionFut, TxMarshalingErr,
UnexpectedDerivationMethod, UtxoStandardCoin, ValidateAddressResult, ValidatePaymentFut,
ValidatePaymentInput, VerificationError, VerificationResult, WatcherValidatePaymentInput, WithdrawError,
WithdrawFut, WithdrawRequest};
use async_trait::async_trait;
use bitcoin::hashes::Hash;
use bitcoin_hashes::sha256::Hash as Sha256;
use bitcrypto::dhash256;
use bitcrypto::ChecksumType;
use chain::TransactionOutput;
use common::executor::spawn;
use common::executor::SpawnFuture;
use common::log::{error, LogOnError, LogState};
use common::{async_blocking, calc_total_pages, log, now_ms, ten, PagingOptionsEnum};
use futures::{FutureExt, TryFutureExt};
Expand Down Expand Up @@ -557,6 +558,8 @@ impl MarketCoinOps for LightningCoin {
impl MmCoin for LightningCoin {
fn is_asset_chain(&self) -> bool { false }

fn spawner(&self) -> CoinFutSpawner { CoinFutSpawner::new(&self.platform.abortable_system) }

fn get_raw_transaction(&self, req: RawTransactionRequest) -> RawTransactionFut {
Box::new(self.platform_coin().get_raw_transaction(req))
}
Expand Down Expand Up @@ -717,6 +720,7 @@ pub async fn start_lightning(
// Initialize the PeerManager
let peer_manager = ln_p2p::init_peer_manager(
ctx.clone(),
&platform,
params.listening_port,
channel_manager.clone(),
gossip_sync.clone(),
Expand All @@ -729,7 +733,7 @@ pub async fn start_lightning(

let trusted_nodes = Arc::new(PaMutex::new(persister.get_trusted_nodes().await?));

let events_abort_handlers = init_events_abort_handlers(platform.clone(), db.clone()).await?;
init_abortable_events(platform.clone(), db.clone()).await?;

// Initialize the event handler
let event_handler = Arc::new(ln_events::LightningEventHandler::new(
Expand All @@ -738,7 +742,6 @@ pub async fn start_lightning(
keys_manager.clone(),
db.clone(),
trusted_nodes.clone(),
events_abort_handlers,
));

// Initialize routing Scorer
Expand Down Expand Up @@ -785,13 +788,14 @@ pub async fn start_lightning(
let open_channels_nodes = Arc::new(PaMutex::new(
ln_utils::get_open_channels_nodes_addresses(persister.clone(), channel_manager.clone()).await?,
));
spawn(ln_p2p::connect_to_nodes_loop(

platform.spawner().spawn(ln_p2p::connect_to_nodes_loop(
open_channels_nodes.clone(),
peer_manager.clone(),
));

// Broadcast Node Announcement
spawn(ln_p2p::ln_node_announcement_loop(
platform.spawner().spawn(ln_p2p::ln_node_announcement_loop(
channel_manager.clone(),
params.node_name,
params.node_color,
Expand Down
Loading