Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid waiting for all EVM nodes to sync the latest nonce #1757

Merged
merged 3 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## ${next-version} - ${release-date}

**Features:**

**Enhancements/Fixes:**
- An issue was fixed where we don't have to wait for all EVM nodes to sync the latest account nonce [#1757](https://github.com/KomodoPlatform/atomicDEX-API/pull/1757)

## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
73 changes: 40 additions & 33 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use ethereum_types::{Address, H160, H256, U256};
use ethkey::{public_to_address, KeyPair, Public, Signature};
use ethkey::{sign, verify_address};
use futures::compat::Future01CompatExt;
use futures::future::{join_all, Either, FutureExt, TryFutureExt};
use futures::future::{join_all, select_ok, Either, FutureExt, TryFutureExt};
use futures01::Future;
use http::StatusCode;
use mm2_core::mm_ctx::{MmArc, MmWeak};
Expand Down Expand Up @@ -775,7 +775,7 @@ async fn withdraw_impl(coin: EthCoin, req: WithdrawRequest) -> WithdrawResult {
let (tx_hash, tx_hex) = match coin.priv_key_policy {
EthPrivKeyPolicy::KeyPair(ref key_pair) => {
let _nonce_lock = coin.nonce_lock.lock().await;
let nonce = get_addr_nonce(coin.my_address, coin.web3_instances.clone())
let (nonce, _) = get_addr_nonce(coin.my_address, coin.web3_instances.clone())
.compat()
.timeout_secs(30.)
.await?
Expand Down Expand Up @@ -924,7 +924,7 @@ pub async fn withdraw_erc1155(ctx: MmArc, req: WithdrawErc1155) -> WithdrawNftRe
let (gas, gas_price) =
get_eth_gas_details(&eth_coin, req.fee, eth_value, data.clone().into(), call_addr, false).await?;
let _nonce_lock = eth_coin.nonce_lock.lock().await;
let nonce = get_addr_nonce(eth_coin.my_address, eth_coin.web3_instances.clone())
let (nonce, _) = get_addr_nonce(eth_coin.my_address, eth_coin.web3_instances.clone())
.compat()
.timeout_secs(30.)
.await?
Expand Down Expand Up @@ -992,7 +992,7 @@ pub async fn withdraw_erc721(ctx: MmArc, req: WithdrawErc721) -> WithdrawNftResu
let (gas, gas_price) =
get_eth_gas_details(&eth_coin, req.fee, eth_value, data.clone().into(), call_addr, false).await?;
let _nonce_lock = eth_coin.nonce_lock.lock().await;
let nonce = get_addr_nonce(eth_coin.my_address, eth_coin.web3_instances.clone())
let (nonce, _) = get_addr_nonce(eth_coin.my_address, eth_coin.web3_instances.clone())
.compat()
.timeout_secs(30.)
.await?
Expand Down Expand Up @@ -2005,7 +2005,7 @@ async fn sign_and_send_transaction_with_keypair(
}
let _nonce_lock = coin.nonce_lock.lock().await;
status.status(tags!(), "get_addr_nonce…");
let nonce = try_tx_s!(
let (nonce, web3_instances_with_latest_nonce) = try_tx_s!(
get_addr_nonce(coin.my_address, coin.web3_instances.clone())
.compat()
.await
Expand All @@ -2026,14 +2026,10 @@ async fn sign_and_send_transaction_with_keypair(
let bytes = Bytes(rlp::encode(&signed).to_vec());
status.status(tags!(), "send_raw_transaction…");

try_tx_s!(
coin.web3
.eth()
.send_raw_transaction(bytes)
.await
.map_err(|e| ERRL!("{}", e)),
signed
);
let futures = web3_instances_with_latest_nonce
.into_iter()
.map(|web3_instance| web3_instance.web3.eth().send_raw_transaction(bytes.clone()));
try_tx_s!(select_ok(futures).await.map_err(|e| ERRL!("{}", e)), signed);

status.status(tags!(), "get_addr_nonce…");
coin.wait_for_addr_nonce_increase(coin.my_address, nonce).await;
Expand Down Expand Up @@ -4012,7 +4008,7 @@ impl EthCoin {
Box::new(fut.boxed().compat())
}

/// Checks every second till ETH nodes recognize that nonce is increased.
/// Checks every second till at least one ETH node recognizes that nonce is increased.
/// Parity has reliable "nextNonce" method that always returns correct nonce for address.
/// But we can't expect that all nodes will always be Parity.
/// Some of ETH forks use Geth only so they don't have Parity nodes at all.
Expand All @@ -4025,8 +4021,8 @@ impl EthCoin {
async fn wait_for_addr_nonce_increase(&self, addr: Address, prev_nonce: U256) {
repeatable!(async {
match get_addr_nonce(addr, self.web3_instances.clone()).compat().await {
Ok(new_nonce) if new_nonce > prev_nonce => Ready(()),
Ok(_nonce) => Retry(()),
Ok((new_nonce, _)) if new_nonce > prev_nonce => Ready(()),
Ok((_nonce, _)) => Retry(()),
Err(e) => {
error!("Error getting {} {} nonce: {}", self.ticker(), self.my_address, e);
Retry(())
Expand Down Expand Up @@ -5015,31 +5011,37 @@ fn checksum_address(addr: &str) -> String {
/// The input must be 0x prefixed hex string
fn is_valid_checksum_addr(addr: &str) -> bool { addr == checksum_address(addr) }

/// Requests the nonce from all available nodes and checks that returned results equal.
/// Nodes might need some time to sync and there can be other coins that use same nodes in different order.
/// We need to be sure that nonce is updated on all of them before and after transaction is sent.
/// Requests the nonce from all available nodes and returns the highest nonce available with the list of nodes that returned the highest nonce.
/// Transactions will be sent using the nodes that returned the highest nonce.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we will trust the result from a single node now. Do you think it can cause any problems?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already sent the transaction to a single node before this PR, we now make sure that the node that we send the transaction to is up to date, so I don't think we will have more problems than we already did. The idea was that swap operations can continue to work even if only one node is up to date and all other nodes are down or outdated, this eliminates the case that happened recently where a taker couldn't spend a swap transaction because one node was not working right.

Alternatively, I can add a 60s delay for the nodes to reach consensus/same state or nonce, if they didn't, we use only the nodes that are up to date as it's done here and log an error for the nodes that are not up to date or take time to sync, what do you think @caglaryucekaya?

In the end I don't think this is a final solution, there is no best solution for the nonce problem in EVM transactions, some wallets (e.g. metamask) save the nonce value and increase it internally, but this not ideal if a transaction is sent from another wallet by the user (if the user uses the same seed in multiple wallets). Other solutions is to wait for confirmation of a transaction before sending another, I believe some EVM DEXs do that to avoid such problems.

The best solution to avoid trusting a single node is to send the transaction to all nodes but this has a higher cost than sending it to only one node.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I can add a 60s delay for the nodes to reach consensus/same state or nonce, if they didn't, we use only the nodes that are up to date as it's done here and log an error for the nodes that are not up to date or take time to sync, what do you think @caglaryucekaya?

If we're going to accept the largest nonce in the end, it wouldn't bring anything to wait for the others for 60s. I was thinking of a case where the largest nonce result returned is incorrect, but we can ignore that since it wouldn't happen.

#[cfg_attr(test, mockable)]
fn get_addr_nonce(addr: Address, web3s: Vec<Web3Instance>) -> Box<dyn Future<Item = U256, Error = String> + Send> {
fn get_addr_nonce(
addr: Address,
web3s: Vec<Web3Instance>,
) -> Box<dyn Future<Item = (U256, Vec<Web3Instance>), Error = String> + Send> {
let fut = async move {
let mut errors: u32 = 0;
loop {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change this with a repeatable future since we started to work on this function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it can be done. Will have to change the function to async to do that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about this, then it can stay as it is

let futures: Vec<_> = web3s
let (futures, web3s): (Vec<_>, Vec<_>) = web3s
.iter()
.map(|web3| {
if web3.is_parity {
let parity: ParityNonce<_> = web3.web3.api();
Either::Left(parity.parity_next_nonce(addr))
(Either::Left(parity.parity_next_nonce(addr)), web3.clone())
} else {
Either::Right(web3.web3.eth().transaction_count(addr, Some(BlockNumber::Pending)))
(
Either::Right(web3.web3.eth().transaction_count(addr, Some(BlockNumber::Pending))),
web3.clone(),
)
}
})
.collect();
.unzip();

let nonces: Vec<_> = join_all(futures)
.await
.into_iter()
.filter_map(|nonce_res| match nonce_res {
Ok(n) => Some(n),
.zip(web3s.into_iter())
.filter_map(|(nonce_res, web3)| match nonce_res {
Ok(n) => Some((n, web3)),
Err(e) => {
error!("Error getting nonce for addr {:?}: {}", addr, e);
None
Expand All @@ -5053,13 +5055,18 @@ fn get_addr_nonce(addr: Address, web3s: Vec<Web3Instance>) -> Box<dyn Future<Ite
return ERR!("Couldn't get nonce after 5 errored attempts, aborting");
}
} else {
let max = nonces.iter().max().unwrap();
let min = nonces.iter().min().unwrap();
if max == min {
return Ok(*max);
} else {
warn!("Max nonce {} != {} min nonce", max, min);
}
let max = nonces
.iter()
.map(|(n, _)| *n)
.max()
.expect("nonces should not be empty!");
break Ok((
max,
nonces
.into_iter()
.filter_map(|(n, web3)| if n == max { Some(web3) } else { None })
.collect(),
));
}
Timer::sleep(1.).await
}
Expand Down
4 changes: 2 additions & 2 deletions mm2src/coins/eth/eth_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ fn test_withdraw_impl_manual_fee() {
let balance = wei_from_big_decimal(&1000000000.into(), 18).unwrap();
MockResult::Return(Box::new(futures01::future::ok(balance)))
});
get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok(0.into()))));
get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![])))));

let withdraw_req = WithdrawRequest {
amount: 1.into(),
Expand Down Expand Up @@ -740,7 +740,7 @@ fn test_withdraw_impl_fee_details() {
let balance = wei_from_big_decimal(&1000000000.into(), 18).unwrap();
MockResult::Return(Box::new(futures01::future::ok(balance)))
});
get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok(0.into()))));
get_addr_nonce.mock_safe(|_, _| MockResult::Return(Box::new(futures01::future::ok((0.into(), vec![])))));

let withdraw_req = WithdrawRequest {
amount: 1.into(),
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_main/tests/mm2_tests/mm2_tests_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ fn trade_test_electrum_and_eth_coins() {
let bob_policy = Mm2InitPrivKeyPolicy::Iguana;
let alice_policy = Mm2InitPrivKeyPolicy::GlobalHDAccount(0);
let pairs = &[("ETH", "JST")];
block_on(trade_base_rel_electrum(bob_policy, alice_policy, pairs, 1., 2., 0.1));
block_on(trade_base_rel_electrum(bob_policy, alice_policy, pairs, 1., 2., 0.01));
}

#[test]
Expand Down