Skip to content

Commit

Permalink
feat(sdk): retry impl
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Oct 24, 2024
1 parent 4343b6e commit 50ad84c
Show file tree
Hide file tree
Showing 17 changed files with 425 additions and 161 deletions.
57 changes: 51 additions & 6 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{Address, CanRetry, DapiClientError, RequestSettings};
use dapi_grpc::mock::Mockable;
use dapi_grpc::platform::VersionedGrpcResponse;
use dapi_grpc::tonic::async_trait;
use http_serde::http::Uri;
use std::fmt::Debug;

#[async_trait]
Expand All @@ -20,6 +21,12 @@ pub trait DapiRequestExecutor {
<R::Client as TransportClient>::Error: Mockable;
}

/// Unwrap wrapped types
pub trait IntoInner<T> {
/// Unwrap the inner type
fn into_inner(self) -> T;
}

/// Error happened during request execution.
#[derive(Debug, Clone, thiserror::Error, Eq, PartialEq)]
#[error("{inner}")]
Expand All @@ -31,11 +38,27 @@ pub struct ExecutionError<E> {
/// The address of the node that was used for the request
pub address: Option<Address>,
}

impl<E> ExecutionError<E> {
/// Convert inner error type without loosing retries and address
pub fn into<F>(self) -> ExecutionError<F>
where
F: From<E>,
{
ExecutionError {
inner: self.inner.into(),
retries: self.retries,
address: self.address,
}
}
}

impl<E, I> IntoInner<I> for ExecutionError<E>
where
E: Into<I>,
{
/// Unwrap the error cause
pub fn into_inner(self) -> E {
self.inner
fn into_inner(self) -> I {
self.inner.into()
}
}

Expand All @@ -56,10 +79,13 @@ pub struct ExecutionResponse<R> {
pub address: Address,
}

impl<R> ExecutionResponse<R> {
impl<R, I> IntoInner<I> for ExecutionResponse<R>
where
R: Into<I>,
{
/// Unwrap the response
pub fn into_inner(self) -> R {
self.inner
fn into_inner(self) -> I {
self.inner.into()
}
}

Expand All @@ -77,5 +103,24 @@ impl<T: VersionedGrpcResponse> VersionedGrpcResponse for ExecutionResponse<T> {
}
}

impl<R> From<R> for ExecutionResponse<R> {
fn from(inner: R) -> Self {
Self {
inner,
retries: 0,
address: Uri::default().into(),
}
}
}

/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<T, E> IntoInner<Result<T, E>> for ExecutionResult<T, E> {
fn into_inner(self) -> Result<T, E> {
match self {
Ok(response) => Ok(response.into_inner()),
Err(error) => Err(error.into_inner()),
}
}
}
24 changes: 24 additions & 0 deletions packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
use std::time::Duration;

use crate::transport::TransportRequest;

/// Default low-level client timeout
const DEFAULT_CONNECT_TIMEOUT: Option<Duration> = None;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -77,3 +79,25 @@ pub struct AppliedRequestSettings {
/// Ban DAPI address if node not responded or responded with error.
pub ban_failed_address: bool,
}

impl AppliedRequestSettings {
/// Create [AppliedRequestSettings] from [RequestSettings] with default values.
///
/// Combine provided [RequestSettings] together with [request-level overrides](TransportRequest::SETTINGS_OVERRIDES).
///
///
/// # Arguments
///
/// * `global_settings` - global settings for all requests.
/// * `request_settings` - settings for a specific request.
pub fn from_settings<R: TransportRequest>(
global_settings: &RequestSettings,
request_settings: &RequestSettings,
) -> Self {
RequestSettings::default()
.override_by(*global_settings)
.override_by(R::SETTINGS_OVERRIDES)
.override_by(*request_settings)
.finalize()
}
}
6 changes: 2 additions & 4 deletions packages/rs-sdk/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProo
use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof;
use dpp::prelude::AssetLockProof;

use rs_dapi_client::{DapiRequestExecutor, RequestSettings};
use rs_dapi_client::{DapiRequestExecutor, IntoInner, RequestSettings};
use std::time::Duration;
use tokio::time::{sleep, timeout};

Expand Down Expand Up @@ -58,7 +58,6 @@ impl Sdk {
.await
// TODO: We need better way to handle execution response and errors
.map(|execution_response| execution_response.into_inner())
.map_err(|execution_error| execution_error.into_inner())
.map_err(|e| Error::DapiClientError(e.to_string()))
}

Expand Down Expand Up @@ -184,8 +183,7 @@ impl Sdk {
RequestSettings::default(),
)
.await // TODO: We need better way to handle execution errors
.map_err(|error| error.into_inner())?
.into_inner();
.into_inner()?;

core_chain_locked_height = height;

Expand Down
51 changes: 27 additions & 24 deletions packages/rs-sdk/src/platform/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! traits. The associated [Fetch::Request]` type needs to implement [TransportRequest].
use crate::mock::MockResponse;
use crate::retry;
use crate::{error::Error, platform::query::Query, Sdk};
use dapi_grpc::platform::v0::{self as platform_proto, Proof, ResponseMetadata};
use dpp::voting::votes::Vote;
Expand All @@ -17,10 +18,8 @@ use dpp::{
prelude::Identity,
};
use drive_proof_verifier::FromProof;
use futures::future::BoxFuture;
use futures::FutureExt;
use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings};
use rs_dapi_client::{ExecutionError, ExecutionResponse};
use rs_dapi_client::{ExecutionError, ExecutionResponse, IntoInner};
use std::fmt::Debug;

use super::types::identity::IdentityRequest;
Expand Down Expand Up @@ -156,51 +155,55 @@ where
query: Q,
settings: Option<RequestSettings>,
) -> Result<(Option<Self>, ResponseMetadata, Proof), Error> {
let request1: <Self as Fetch>::Request = query.query(sdk.prove())?.clone();
let request1: <Self as Fetch>::Request = query.query(sdk.prove())?;
let request = &request1;
let fut = || -> BoxFuture< Result<ExecutionResponse<(Option<Self>, ResponseMetadata, Proof)>, ExecutionError<Error>>> {
async {

let fut = |settings: RequestSettings| {
async move {
let response = request
.clone()
.execute(sdk, settings.unwrap_or_default())
.execute(sdk, settings)
.await // TODO: We need better way to handle execution response and errors
.map_err(|execution_error|
ExecutionError{
inner:Error::from(execution_error.inner),
address: execution_error.address,
retries: execution_error.retries,
})?;
.map_err(|execution_error| ExecutionError {
inner: Error::from(execution_error.inner),
address: execution_error.address,
retries: execution_error.retries,
})?;

let address = response.address.clone();
let retries = response.retries;
let grpc_response =response.into_inner();
let address = response.address.clone();
let retries = response.retries;
let grpc_response = response.into_inner();

let object_type = std::any::type_name::<Self>().to_string();
tracing::trace!(request = ?request, response = ?grpc_response, object_type, "fetched object from platform");

let (object, response_metadata, proof): (Option<Self>, ResponseMetadata, Proof) =
sdk.parse_proof_with_metadata_and_proof(request.clone(), grpc_response)
.await.map_err(|e| ExecutionError{
.await
.map_err(|e| ExecutionError {
inner: e,
address: Some(address.clone()),
retries,
}) ?;
})?;

let o= match object {
let o = match object {
Some(item) => Ok((item.into(), response_metadata, proof)),
None => Ok((None, response_metadata, proof)),
};

o.map(|x| ExecutionResponse{
o.map(|x| ExecutionResponse {
inner: x,
address,
retries,
})
}.boxed()
}
};
// TODO: correct retry configuration
let configured_retries = settings.unwrap_or_default().retries.unwrap_or(10);
crate::sync::retry(fut, configured_retries)

let settings = sdk
.dapi_client_settings
.override_by(settings.unwrap_or_default());

retry!(settings, fut)
.await
.map(|x| x.into_inner())
.map_err(|e| e.into_inner())
Expand Down
Loading

0 comments on commit 50ad84c

Please sign in to comment.