Skip to content

Commit

Permalink
[r2r] Tendermint multiple rpcs optimization (#1568)
Browse files Browse the repository at this point in the history
* branch from dev

* trait RpcCommonOps

* import HttpTransportNode after dev merge

* wip

* small changes in errors

* wasm tests

* use String in RpcClientInitError

* iterate_over_urls func

* wip

* use iterate_over_urls func in get_rpc_client func. need to handle error prop better

* wip

* wip

* leave WrongRpcClient in TendermintCoinRpcError

* add RpcClientError for match

* remove some notes

* fmt

* save dev state, some notes added

* wip

* wip

* wip

* impl RpcCommonOps for TendermintCoin, use just AsyncMutex

* remove RpcCommonError and RpcClientEnum, use types in trait instead

* use retain to filter valid urls

* add some notes

* return error if we have one or more invalid urls

* remove pub and dead_code

* use rotate_right

* add rotate_left, into_iter

* use slice::join,  return MmResult

* add check rpc_urls.is_empty
  • Loading branch information
laruh authored Dec 26, 2022
1 parent 4f1190b commit 47b1a71
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 32 deletions.
10 changes: 10 additions & 0 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3534,3 +3534,13 @@ where
b.block_height.cmp(&a.block_height)
}
}

/// Use trait in the case, when we have to send requests to rpc client.
#[async_trait]
pub trait RpcCommonOps {
type RpcClient;
type Error;

/// Returns an alive RPC client or returns an error if no RPC endpoint is currently available.
async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error>;
}
102 changes: 70 additions & 32 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{big_decimal_from_sat_unsigned, BalanceError, BalanceFut, BigDecimal,
CoinBalance, CoinFutSpawner, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin,
NegotiateSwapContractAddrErr, PaymentInstructions, PaymentInstructionsErr, PrivKeyBuildPolicy,
PrivKeyPolicyNotAllowed, RawTransactionError, RawTransactionFut, RawTransactionRequest, RawTransactionRes,
SearchForSwapTxSpendInput, SendMakerPaymentArgs, SendMakerRefundsPaymentArgs,
RpcCommonOps, SearchForSwapTxSpendInput, SendMakerPaymentArgs, SendMakerRefundsPaymentArgs,
SendMakerSpendsTakerPaymentArgs, SendTakerPaymentArgs, SendTakerRefundsPaymentArgs,
SendTakerSpendsMakerPaymentArgs, SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageError,
TradePreimageFut, TradePreimageResult, TradePreimageValue, TransactionDetails, TransactionEnum,
Expand All @@ -24,7 +24,7 @@ use bitcrypto::{dhash160, sha256};
use common::executor::{abortable_queue::AbortableQueue, AbortableSystem};
use common::executor::{AbortedError, Timer};
use common::log::warn;
use common::{get_utc_timestamp, log, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use common::{get_utc_timestamp, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use cosmrs::bank::MsgSend;
use cosmrs::crypto::secp256k1::SigningKey;
use cosmrs::proto::cosmos::auth::v1beta1::{BaseAccount, QueryAccountRequest, QueryAccountResponse};
Expand All @@ -45,6 +45,7 @@ use futures::lock::Mutex as AsyncMutex;
use futures::{FutureExt, TryFutureExt};
use futures01::Future;
use hex::FromHexError;
use itertools::Itertools;
use keys::KeyPair;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
Expand Down Expand Up @@ -164,9 +165,40 @@ impl TendermintConf {
}
}

struct TendermintRpcClient(AsyncMutex<TendermintRpcClientImpl>);

struct TendermintRpcClientImpl {
rpc_clients: Vec<HttpClient>,
}

#[async_trait]
impl RpcCommonOps for TendermintCoin {
type RpcClient = HttpClient;
type Error = TendermintCoinRpcError;

async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error> {
let mut client_impl = self.client.0.lock().await;
// try to find first live client
for (i, client) in client_impl.rpc_clients.clone().into_iter().enumerate() {
if client
.perform(HealthRequest)
.timeout(Duration::from_secs(3))
.await
.is_ok()
{
// Bring the live client to the front of rpc_clients
client_impl.rpc_clients.rotate_left(i);
return Ok(client);
}
}
return Err(TendermintCoinRpcError::RpcClientError(
"All the current rpc nodes are unavailable.".to_string(),
));
}
}

pub struct TendermintCoinImpl {
ticker: String,
rpc_clients: Vec<HttpClient>,
/// As seconds
avg_blocktime: u8,
/// My address
Expand All @@ -183,6 +215,7 @@ pub struct TendermintCoinImpl {
/// or on [`MmArc::stop`].
pub(super) abortable_system: AbortableQueue,
pub(crate) history_sync_state: Mutex<HistorySyncState>,
client: TendermintRpcClient,
}

#[derive(Clone)]
Expand Down Expand Up @@ -226,6 +259,7 @@ pub enum TendermintCoinRpcError {
Prost(DecodeError),
InvalidResponse(String),
PerformError(String),
RpcClientError(String),
}

impl From<DecodeError> for TendermintCoinRpcError {
Expand All @@ -242,6 +276,7 @@ impl From<TendermintCoinRpcError> for BalanceError {
TendermintCoinRpcError::InvalidResponse(e) => BalanceError::InvalidResponse(e),
TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => BalanceError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => BalanceError::Transport(e),
}
}
}
Expand All @@ -252,6 +287,7 @@ impl From<TendermintCoinRpcError> for ValidatePaymentError {
TendermintCoinRpcError::InvalidResponse(e) => ValidatePaymentError::InvalidRpcResponse(e),
TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => ValidatePaymentError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => ValidatePaymentError::Transport(e),
}
}
}
Expand Down Expand Up @@ -381,25 +417,9 @@ impl TendermintCommons for TendermintCoin {
Ok(result)
}

// TODO
// Save one working client to the coin context, only try others once it doesn't
// work anymore.
// Also, try couple times more on health check errors.
#[inline(always)]
async fn rpc_client(&self) -> MmResult<HttpClient, TendermintCoinRpcError> {
for rpc_client in self.rpc_clients.iter() {
match rpc_client.perform(HealthRequest).timeout(Duration::from_secs(3)).await {
Ok(Ok(_)) => return Ok(rpc_client.clone()),
Ok(Err(e)) => log::warn!(
"Recieved error from Tendermint rpc node during health check. Error: {:?}",
e
),
Err(_) => log::warn!("Tendermint rpc node: {:?} got timeout during health check", rpc_client),
};
}

MmError::err(TendermintCoinRpcError::PerformError(
"All the current rpc nodes are unavailable.".to_string(),
))
self.get_live_client().await.map_to_mm(|e| e)
}
}

Expand Down Expand Up @@ -430,17 +450,12 @@ impl TendermintCoin {
}
})?;

let rpc_clients: Result<Vec<HttpClient>, _> = rpc_urls
.iter()
.map(|url| {
HttpClient::new(url.as_str()).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
kind: TendermintInitErrorKind::RpcClientInitError(e.to_string()),
})
})
.collect();
let rpc_clients = clients_from_urls(rpc_urls.as_ref()).mm_err(|kind| TendermintInitError {
ticker: ticker.clone(),
kind,
})?;

let rpc_clients = rpc_clients?;
let client_impl = TendermintRpcClientImpl { rpc_clients };

let chain_id = ChainId::try_from(protocol_info.chain_id).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
Expand Down Expand Up @@ -470,7 +485,6 @@ impl TendermintCoin {

Ok(TendermintCoin(Arc::new(TendermintCoinImpl {
ticker,
rpc_clients,
account_id,
account_prefix: protocol_info.account_prefix,
priv_key: priv_key.to_vec(),
Expand All @@ -483,6 +497,7 @@ impl TendermintCoin {
tokens_info: PaMutex::new(HashMap::new()),
abortable_system,
history_sync_state: Mutex::new(history_sync_state),
client: TendermintRpcClient(AsyncMutex::new(client_impl)),
})))
}

Expand Down Expand Up @@ -1357,6 +1372,29 @@ impl TendermintCoin {
}
}

fn clients_from_urls(rpc_urls: &[String]) -> MmResult<Vec<HttpClient>, TendermintInitErrorKind> {
if rpc_urls.is_empty() {
return MmError::err(TendermintInitErrorKind::EmptyRpcUrls);
}
let mut clients = Vec::new();
let mut errors = Vec::new();
// check that all urls are valid
// keep all invalid urls in one vector to show all of them in error
for url in rpc_urls.iter() {
match HttpClient::new(url.as_str()) {
Ok(client) => clients.push(client),
Err(e) => errors.push(format!("Url {} is invalid, got error {}", url, e)),
}
}
drop_mutability!(clients);
drop_mutability!(errors);
if !errors.is_empty() {
let errors: String = errors.into_iter().join(", ");
return MmError::err(TendermintInitErrorKind::RpcClientInitError(errors));
}
Ok(clients)
}

#[async_trait]
#[allow(unused_variables)]
impl MmCoin for TendermintCoin {
Expand Down

0 comments on commit 47b1a71

Please sign in to comment.