Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Tendermint multiple rpcs optimization #1568

Merged
merged 36 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
19dfcd6
branch from dev
laruh Dec 5, 2022
014bc59
trait RpcCommonOps
laruh Dec 5, 2022
040f25f
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 5, 2022
c6e6b10
import HttpTransportNode after dev merge
laruh Dec 5, 2022
847072d
wip
laruh Dec 6, 2022
99fcd04
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 6, 2022
7f70b63
small changes in errors
laruh Dec 6, 2022
0f26cf9
wasm tests
laruh Dec 6, 2022
5aa99fe
use String in RpcClientInitError
laruh Dec 7, 2022
6e68b6e
iterate_over_urls func
laruh Dec 7, 2022
557adc1
wip
laruh Dec 7, 2022
a58b40f
use iterate_over_urls func in get_rpc_client func. need to handle err…
laruh Dec 7, 2022
476f8d7
wip
laruh Dec 7, 2022
1b77448
wip
laruh Dec 8, 2022
0252557
leave WrongRpcClient in TendermintCoinRpcError
laruh Dec 8, 2022
f3c9753
add RpcClientError for match
laruh Dec 8, 2022
5c684bb
remove some notes
laruh Dec 8, 2022
0632c4c
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
f7a0808
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
e7c23df
fmt
laruh Dec 12, 2022
619b607
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
af6a7e7
save dev state, some notes added
laruh Dec 12, 2022
9f86b4a
wip
laruh Dec 14, 2022
d02353e
wip
laruh Dec 14, 2022
dfc4e3e
wip
laruh Dec 14, 2022
26acb77
impl RpcCommonOps for TendermintCoin, use just AsyncMutex
laruh Dec 14, 2022
28229b5
remove RpcCommonError and RpcClientEnum, use types in trait instead
laruh Dec 14, 2022
f137824
use retain to filter valid urls
laruh Dec 14, 2022
a1532d3
add some notes
laruh Dec 15, 2022
8c71642
return error if we have one or more invalid urls
laruh Dec 15, 2022
238cead
remove pub and dead_code
laruh Dec 15, 2022
3918e1c
use rotate_right
laruh Dec 16, 2022
a25ffaa
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 16, 2022
d3b46fe
add rotate_left, into_iter
laruh Dec 16, 2022
5edff34
use slice::join, return MmResult
laruh Dec 19, 2022
b561c11
add check rpc_urls.is_empty
laruh Dec 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3534,3 +3534,13 @@ where
b.block_height.cmp(&a.block_height)
}
}

/// Use trait in the case, when we have to send requests to rpc client.
#[async_trait]
pub trait RpcCommonOps {
type RpcClient;
type Error;

/// Returns an alive RPC client or returns an error if no RPC endpoint is currently available.
async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error>;
}
106 changes: 74 additions & 32 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{big_decimal_from_sat_unsigned, BalanceError, BalanceFut, BigDecimal,
CoinBalance, CoinFutSpawner, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin,
NegotiateSwapContractAddrErr, PaymentInstructions, PaymentInstructionsErr, PrivKeyBuildPolicy,
PrivKeyPolicyNotAllowed, RawTransactionError, RawTransactionFut, RawTransactionRequest, RawTransactionRes,
SearchForSwapTxSpendInput, SendMakerPaymentArgs, SendMakerRefundsPaymentArgs,
RpcCommonOps, SearchForSwapTxSpendInput, SendMakerPaymentArgs, SendMakerRefundsPaymentArgs,
SendMakerSpendsTakerPaymentArgs, SendTakerPaymentArgs, SendTakerRefundsPaymentArgs,
SendTakerSpendsMakerPaymentArgs, SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageError,
TradePreimageFut, TradePreimageResult, TradePreimageValue, TransactionDetails, TransactionEnum,
Expand All @@ -24,7 +24,7 @@ use bitcrypto::{dhash160, sha256};
use common::executor::{abortable_queue::AbortableQueue, AbortableSystem};
use common::executor::{AbortedError, Timer};
use common::log::warn;
use common::{get_utc_timestamp, log, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use common::{get_utc_timestamp, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use cosmrs::bank::MsgSend;
use cosmrs::crypto::secp256k1::SigningKey;
use cosmrs::proto::cosmos::auth::v1beta1::{BaseAccount, QueryAccountRequest, QueryAccountResponse};
Expand Down Expand Up @@ -164,9 +164,48 @@ impl TendermintConf {
}
}

struct TendermintRpcClient(AsyncMutex<TendermintRpcClientImpl>);

struct TendermintRpcClientImpl {
rpc_clients: Vec<HttpClient>,
}

#[async_trait]
impl RpcCommonOps for TendermintCoin {
type RpcClient = HttpClient;
type Error = TendermintCoinRpcError;

async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error> {
let mut client_impl = self.client.0.lock().await;
let mut new_clients = client_impl.rpc_clients.clone();
for (i, client) in client_impl.rpc_clients.iter().enumerate() {
if client
.perform(HealthRequest)
.timeout(Duration::from_secs(3))
.await
.is_ok()
{
let res_client = client.clone();
// if the first client is alive, no need to change rpc_clients field
return if i == 0 {
Ok(res_client)
} else {
let len = new_clients.len();
// move the last len - i elements to front, so i client will be the first
new_clients.rotate_right(len - i);
client_impl.rpc_clients = new_clients;
Ok(res_client)
};
}
}
return Err(TendermintCoinRpcError::RpcClientError(
"All the current rpc nodes are unavailable.".to_string(),
));
}
shamardy marked this conversation as resolved.
Show resolved Hide resolved
}

pub struct TendermintCoinImpl {
ticker: String,
rpc_clients: Vec<HttpClient>,
/// As seconds
avg_blocktime: u8,
/// My address
Expand All @@ -183,6 +222,7 @@ pub struct TendermintCoinImpl {
/// or on [`MmArc::stop`].
pub(super) abortable_system: AbortableQueue,
pub(crate) history_sync_state: Mutex<HistorySyncState>,
client: TendermintRpcClient,
}

#[derive(Clone)]
Expand Down Expand Up @@ -226,6 +266,7 @@ pub enum TendermintCoinRpcError {
Prost(DecodeError),
InvalidResponse(String),
PerformError(String),
RpcClientError(String),
}

impl From<DecodeError> for TendermintCoinRpcError {
Expand All @@ -242,6 +283,7 @@ impl From<TendermintCoinRpcError> for BalanceError {
TendermintCoinRpcError::InvalidResponse(e) => BalanceError::InvalidResponse(e),
TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => BalanceError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => BalanceError::Transport(e),
}
}
}
Expand All @@ -252,6 +294,7 @@ impl From<TendermintCoinRpcError> for ValidatePaymentError {
TendermintCoinRpcError::InvalidResponse(e) => ValidatePaymentError::InvalidRpcResponse(e),
TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => ValidatePaymentError::Transport(e),
TendermintCoinRpcError::RpcClientError(e) => ValidatePaymentError::Transport(e),
}
}
}
Expand Down Expand Up @@ -381,25 +424,9 @@ impl TendermintCommons for TendermintCoin {
Ok(result)
}

// TODO
// Save one working client to the coin context, only try others once it doesn't
// work anymore.
// Also, try couple times more on health check errors.
#[inline(always)]
async fn rpc_client(&self) -> MmResult<HttpClient, TendermintCoinRpcError> {
for rpc_client in self.rpc_clients.iter() {
match rpc_client.perform(HealthRequest).timeout(Duration::from_secs(3)).await {
Ok(Ok(_)) => return Ok(rpc_client.clone()),
Ok(Err(e)) => log::warn!(
"Recieved error from Tendermint rpc node during health check. Error: {:?}",
e
),
Err(_) => log::warn!("Tendermint rpc node: {:?} got timeout during health check", rpc_client),
};
}

MmError::err(TendermintCoinRpcError::PerformError(
"All the current rpc nodes are unavailable.".to_string(),
))
self.get_live_client().await.map_to_mm(|e| e)
}
}

Expand Down Expand Up @@ -430,17 +457,12 @@ impl TendermintCoin {
}
})?;

let rpc_clients: Result<Vec<HttpClient>, _> = rpc_urls
.iter()
.map(|url| {
HttpClient::new(url.as_str()).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
kind: TendermintInitErrorKind::RpcClientInitError(e.to_string()),
})
})
.collect();
let rpc_clients = clients_from_urls(rpc_urls.as_ref()).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
kind: TendermintInitErrorKind::RpcClientInitError(e),
})?;

let rpc_clients = rpc_clients?;
let client_impl = TendermintRpcClientImpl { rpc_clients };

let chain_id = ChainId::try_from(protocol_info.chain_id).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
Expand Down Expand Up @@ -470,7 +492,6 @@ impl TendermintCoin {

Ok(TendermintCoin(Arc::new(TendermintCoinImpl {
ticker,
rpc_clients,
account_id,
account_prefix: protocol_info.account_prefix,
priv_key: priv_key.to_vec(),
Expand All @@ -483,6 +504,7 @@ impl TendermintCoin {
tokens_info: PaMutex::new(HashMap::new()),
abortable_system,
history_sync_state: Mutex::new(history_sync_state),
client: TendermintRpcClient(AsyncMutex::new(client_impl)),
})))
}

Expand Down Expand Up @@ -1357,6 +1379,26 @@ impl TendermintCoin {
}
}

fn clients_from_urls(rpc_urls: &[String]) -> Result<Vec<HttpClient>, String> {
let mut clients = Vec::new();
let mut errors = Vec::new();
// check that all urls are valid
// keep all invalid urls in one vector to show all of them in error
for url in rpc_urls.iter() {
match HttpClient::new(url.as_str()) {
Ok(client) => clients.push(client),
Err(e) => errors.push(format!("Url {} is invalid, got error {}", url, e)),
}
}
drop_mutability!(clients);
drop_mutability!(errors);
if !errors.is_empty() {
let errors: String = errors.iter().map(|e| format!("{:?}", e)).collect();
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
return Err(errors);
}
Ok(clients)
}

#[async_trait]
#[allow(unused_variables)]
impl MmCoin for TendermintCoin {
Expand Down