Skip to content

Commit

Permalink
Multiple rpcs optimization (#1653)
Browse files Browse the repository at this point in the history
* wip

* use rotate_left in send_request (not wasm target)

* use rotate_left in send_request (wasm target)

* wip impl RpcCommonOps for LightRpcClient

* impl ZRpcOps for LightRpcClient

* fix wasm

* remove notes

* add empty line

* move drop_mutability!(rpc_clients), delete let e
  • Loading branch information
laruh authored Feb 15, 2023
1 parent 69aff14 commit fb04863
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 77 deletions.
43 changes: 32 additions & 11 deletions mm2src/coins/eth/web3_transport/http_transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::eth::{web3_transport::Web3SendOut, EthCoin, GuiAuthMessages, RpcTransportEventHandler,
RpcTransportEventHandlerShared, Web3RpcError};
use common::APPLICATION_JSON;
use futures::lock::Mutex as AsyncMutex;
#[cfg(not(target_arch = "wasm32"))] use futures::FutureExt;
use futures::TryFutureExt;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -34,10 +35,18 @@ fn single_response<T: Deref<Target = [u8]>>(response: T, rpc_url: &str) -> Resul
}
}

#[derive(Debug)]
struct HttpTransportRpcClient(AsyncMutex<HttpTransportRpcClientImpl>);

#[derive(Debug)]
struct HttpTransportRpcClientImpl {
nodes: Vec<HttpTransportNode>,
}

#[derive(Clone, Debug)]
pub struct HttpTransport {
id: Arc<AtomicUsize>,
nodes: Vec<HttpTransportNode>,
client: Arc<HttpTransportRpcClient>,
event_handlers: Vec<RpcTransportEventHandlerShared>,
pub(crate) gui_auth_validation_generator: Option<GuiAuthValidationGenerator>,
}
Expand All @@ -52,9 +61,10 @@ impl HttpTransport {
#[cfg(test)]
#[inline]
pub fn new(nodes: Vec<HttpTransportNode>) -> Self {
let client_impl = HttpTransportRpcClientImpl { nodes };
HttpTransport {
id: Arc::new(AtomicUsize::new(0)),
nodes,
client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))),
event_handlers: Default::default(),
gui_auth_validation_generator: None,
}
Expand All @@ -65,9 +75,10 @@ impl HttpTransport {
nodes: Vec<HttpTransportNode>,
event_handlers: Vec<RpcTransportEventHandlerShared>,
) -> Self {
let client_impl = HttpTransportRpcClientImpl { nodes };
HttpTransport {
id: Arc::new(AtomicUsize::new(0)),
nodes,
client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))),
event_handlers,
gui_auth_validation_generator: None,
}
Expand All @@ -79,10 +90,11 @@ impl HttpTransport {
uri: url.parse().unwrap(),
gui_auth,
}];
let client_impl = HttpTransportRpcClientImpl { nodes };

HttpTransport {
id: Arc::new(AtomicUsize::new(0)),
nodes,
client: Arc::new(HttpTransportRpcClient(AsyncMutex::new(client_impl))),
event_handlers: Default::default(),
gui_auth_validation_generator: None,
}
Expand All @@ -104,7 +116,7 @@ impl Transport for HttpTransport {
Box::new(
send_request(
request,
self.nodes.clone(),
self.client.clone(),
self.event_handlers.clone(),
self.gui_auth_validation_generator.clone(),
)
Expand All @@ -117,7 +129,7 @@ impl Transport for HttpTransport {
fn send(&self, _id: RequestId, request: Call) -> Self::Out {
let fut = send_request(
request,
self.nodes.clone(),
self.client.clone(),
self.event_handlers.clone(),
self.gui_auth_validation_generator.clone(),
);
Expand Down Expand Up @@ -167,7 +179,7 @@ fn handle_gui_auth_payload_if_activated(
#[cfg(not(target_arch = "wasm32"))]
async fn send_request(
request: Call,
nodes: Vec<HttpTransportNode>,
client: Arc<HttpTransportRpcClient>,
event_handlers: Vec<RpcTransportEventHandlerShared>,
gui_auth_validation_generator: Option<GuiAuthValidationGenerator>,
) -> Result<Json, Error> {
Expand All @@ -184,7 +196,9 @@ async fn send_request(

let serialized_request = to_string(&request);

for node in nodes.iter() {
let mut client_impl = client.0.lock().await;

for (i, node) in client_impl.nodes.clone().iter().enumerate() {
let serialized_request =
match handle_gui_auth_payload_if_activated(&gui_auth_validation_generator, node, &request) {
Ok(Some(r)) => r,
Expand Down Expand Up @@ -238,6 +252,8 @@ async fn send_request(
continue;
}

client_impl.nodes.rotate_left(i);

return single_response(body, &node.uri.to_string());
}

Expand All @@ -247,14 +263,16 @@ async fn send_request(
#[cfg(target_arch = "wasm32")]
async fn send_request(
request: Call,
nodes: Vec<HttpTransportNode>,
client: Arc<HttpTransportRpcClient>,
event_handlers: Vec<RpcTransportEventHandlerShared>,
gui_auth_validation_generator: Option<GuiAuthValidationGenerator>,
) -> Result<Json, Error> {
let serialized_request = to_string(&request);

let mut transport_errors = Vec::new();
for node in nodes.iter() {
let mut client_impl = client.0.lock().await;

for (i, node) in client_impl.nodes.clone().iter().enumerate() {
let serialized_request =
match handle_gui_auth_payload_if_activated(&gui_auth_validation_generator, node, &request) {
Ok(Some(r)) => r,
Expand All @@ -266,7 +284,10 @@ async fn send_request(
};

match send_request_once(serialized_request.clone(), &node.uri, &event_handlers).await {
Ok(response_json) => return Ok(response_json),
Ok(response_json) => {
client_impl.nodes.rotate_left(i);
return Ok(response_json);
},
Err(Error(ErrorKind::Transport(e), _)) => {
transport_errors.push(Web3RpcError::Transport(e));
},
Expand Down
3 changes: 1 addition & 2 deletions mm2src/coins/z_coin/z_coin_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ use zcash_primitives::transaction::builder::Error as ZTxBuilderError;
#[non_exhaustive]
pub enum UpdateBlocksCacheErr {
GrpcError(tonic::Status),
#[display(fmt = "Fail to send requests during clients iteration {:?}", _0)]
GrpcMultiError(Vec<tonic::Status>),
BlocksDbError(SqliteError),
ZcashSqliteError(ZcashClientError),
UtxoRpcError(UtxoRpcError),
InternalError(String),
JsonRpcError(JsonRpcError),
GetLiveLightClientError(String),
}

impl From<tonic::Status> for UpdateBlocksCacheErr {
Expand Down
137 changes: 73 additions & 64 deletions mm2src/coins/z_coin/z_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use mm2_err_handle::prelude::*;
use parking_lot::Mutex;
use prost::Message;
use protobuf::Message as ProtobufMessage;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
Expand All @@ -39,7 +38,7 @@ use zcash_primitives::zip32::ExtendedFullViewingKey;
mod z_coin_grpc {
tonic::include_proto!("cash.z.wallet.sdk.rpc");
}
use crate::ZTransaction;
use crate::{RpcCommonOps, ZTransaction};
use rpc::v1::types::H256 as H256Json;
use z_coin_grpc::compact_tx_streamer_client::CompactTxStreamerClient;
use z_coin_grpc::{BlockId, BlockRange, ChainSpec, CompactBlock as TonicCompactBlock,
Expand Down Expand Up @@ -69,15 +68,43 @@ pub trait ZRpcOps {
async fn check_tx_existence(&mut self, tx_id: TxId) -> bool;
}

struct LightRpcClient {
rpc_clients: AsyncMutex<Vec<CompactTxStreamerClient<Channel>>>,
}

#[async_trait]
impl ZRpcOps for Vec<CompactTxStreamerClient<Channel>> {
async fn get_block_height(&mut self) -> Result<u64, MmError<UpdateBlocksCacheErr>> {
let block = send_multi_light_wallet_request(self, |client| {
impl RpcCommonOps for LightRpcClient {
type RpcClient = CompactTxStreamerClient<Channel>;
type Error = MmError<UpdateBlocksCacheErr>;

async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error> {
let mut clients = self.rpc_clients.lock().await;
for (i, mut client) in clients.clone().into_iter().enumerate() {
let request = tonic::Request::new(ChainSpec {});
client.get_latest_block(request)
})
.await
.map_to_mm(UpdateBlocksCacheErr::GrpcMultiError)?;
// use get_latest_block method as a health check
if client.get_latest_block(request).await.is_ok() {
clients.rotate_left(i);
return Ok(client);
}
}
return Err(MmError::new(UpdateBlocksCacheErr::GetLiveLightClientError(
"All the current light clients are unavailable.".to_string(),
)));
}
}

#[async_trait]
impl ZRpcOps for LightRpcClient {
async fn get_block_height(&mut self) -> Result<u64, MmError<UpdateBlocksCacheErr>> {
let request = tonic::Request::new(ChainSpec {});
let block = self
.get_live_client()
.await?
.get_latest_block(request)
.await
.map_to_mm(UpdateBlocksCacheErr::GrpcError)?
// return the message
.into_inner();
Ok(block.height)
}

Expand All @@ -87,21 +114,23 @@ impl ZRpcOps for Vec<CompactTxStreamerClient<Channel>> {
last_block: u64,
on_block: &mut OnCompactBlockFn,
) -> Result<(), MmError<UpdateBlocksCacheErr>> {
let mut response = send_multi_light_wallet_request(self, |client| {
let request = tonic::Request::new(BlockRange {
start: Some(BlockId {
height: start_block,
hash: Vec::new(),
}),
end: Some(BlockId {
height: last_block,
hash: Vec::new(),
}),
});
client.get_block_range(request)
})
.await
.map_to_mm(UpdateBlocksCacheErr::GrpcMultiError)?;
let request = tonic::Request::new(BlockRange {
start: Some(BlockId {
height: start_block,
hash: Vec::new(),
}),
end: Some(BlockId {
height: last_block,
hash: Vec::new(),
}),
});
let mut response = self
.get_live_client()
.await?
.get_block_range(request)
.await
.map_to_mm(UpdateBlocksCacheErr::GrpcError)?
.into_inner();
// without Pin method get_mut is not found in current scope
while let Some(block) = Pin::new(&mut response).get_mut().message().await? {
debug!("Got block {:?}", block);
Expand All @@ -113,29 +142,25 @@ impl ZRpcOps for Vec<CompactTxStreamerClient<Channel>> {
async fn check_tx_existence(&mut self, tx_id: TxId) -> bool {
let mut attempts = 0;
loop {
match send_multi_light_wallet_request(self, |client| {
let filter = TxFilter {
if let Ok(mut client) = self.get_live_client().await {
let request = tonic::Request::new(TxFilter {
block: None,
index: 0,
hash: tx_id.0.into(),
};
let request = tonic::Request::new(filter);
client.get_transaction(request)
})
.await
{
Ok(_) => break,
Err(e) => {
error!("Error on getting tx {}", tx_id);
let mut e = e;
if e.remove(0).message().contains(NO_TX_ERROR_CODE) {
if attempts >= 3 {
return false;
});
match client.get_transaction(request).await {
Ok(_) => break,
Err(e) => {
error!("Error on getting tx {}", tx_id);
if e.message().contains(NO_TX_ERROR_CODE) {
if attempts >= 3 {
return false;
}
attempts += 1;
}
attempts += 1;
}
Timer::sleep(30.).await;
},
Timer::sleep(30.).await;
},
}
}
}
true
Expand Down Expand Up @@ -417,6 +442,7 @@ pub(super) async fn init_light_client(
rpc_clients.push(CompactTxStreamerClient::new(tonic_channel));
}
drop_mutability!(errors);
drop_mutability!(rpc_clients);
// check if rpc_clients is empty, then for loop wasn't successful
if rpc_clients.is_empty() {
return MmError::err(ZcoinClientInitError::UrlIterFailure(errors));
Expand All @@ -435,8 +461,10 @@ pub(super) async fn init_light_client(
scan_interval_ms,
};

drop_mutability!(rpc_clients);
let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(rpc_clients)));
let light_rpc_clients = LightRpcClient {
rpc_clients: AsyncMutex::new(rpc_clients),
};
let abort_handle = spawn_abortable(light_wallet_db_sync_loop(sync_handle, Box::new(light_rpc_clients)));

Ok((
SaplingSyncConnector::new_mutex_wrapped(sync_watcher, on_tx_gen_notifier, abort_handle),
Expand Down Expand Up @@ -781,22 +809,3 @@ pub(super) struct SaplingSyncGuard<'a> {
pub(super) _connector_guard: AsyncMutexGuard<'a, SaplingSyncConnector>,
pub(super) respawn_guard: SaplingSyncRespawnGuard,
}

// TODO need to refactor https://github.com/KomodoPlatform/atomicDEX-API/issues/1480
async fn send_multi_light_wallet_request<'a, Res, Fut, Fn>(
clients: &'a mut [CompactTxStreamerClient<Channel>],
mut req_fn: Fn,
) -> Result<Res, Vec<tonic::Status>>
where
Fut: Future<Output = Result<tonic::Response<Res>, tonic::Status>>,
Fn: FnMut(&'a mut CompactTxStreamerClient<Channel>) -> Fut,
{
let mut errors = Vec::new();
for client in clients.iter_mut() {
match req_fn(client).await {
Ok(res) => return Ok(res.into_inner()),
Err(e) => errors.push(e),
}
}
Err(errors)
}

0 comments on commit fb04863

Please sign in to comment.