Skip to content

Commit

Permalink
chore: self-review
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Oct 24, 2024
1 parent 50ad84c commit e4e667a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 117 deletions.
24 changes: 0 additions & 24 deletions packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
use std::time::Duration;

use crate::transport::TransportRequest;

/// Default low-level client timeout
const DEFAULT_CONNECT_TIMEOUT: Option<Duration> = None;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -79,25 +77,3 @@ pub struct AppliedRequestSettings {
/// Ban DAPI address if node not responded or responded with error.
pub ban_failed_address: bool,
}

impl AppliedRequestSettings {
/// Create [AppliedRequestSettings] from [RequestSettings] with default values.
///
/// Combine provided [RequestSettings] together with [request-level overrides](TransportRequest::SETTINGS_OVERRIDES).
///
///
/// # Arguments
///
/// * `global_settings` - global settings for all requests.
/// * `request_settings` - settings for a specific request.
pub fn from_settings<R: TransportRequest>(
global_settings: &RequestSettings,
request_settings: &RequestSettings,
) -> Self {
RequestSettings::default()
.override_by(*global_settings)
.override_by(R::SETTINGS_OVERRIDES)
.override_by(*request_settings)
.finalize()
}
}
16 changes: 4 additions & 12 deletions packages/rs-sdk/src/platform/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! traits. The associated [Fetch::Request]` type needs to implement [TransportRequest].
use crate::mock::MockResponse;
use crate::retry;
use crate::sync::retry;
use crate::{error::Error, platform::query::Query, Sdk};
use dapi_grpc::platform::v0::{self as platform_proto, Proof, ResponseMetadata};
use dpp::voting::votes::Vote;
Expand Down Expand Up @@ -155,20 +155,15 @@ where
query: Q,
settings: Option<RequestSettings>,
) -> Result<(Option<Self>, ResponseMetadata, Proof), Error> {
let request1: <Self as Fetch>::Request = query.query(sdk.prove())?;
let request = &request1;
let request: &<Self as Fetch>::Request = &query.query(sdk.prove())?;

let fut = |settings: RequestSettings| {
async move {
let response = request
.clone()
.execute(sdk, settings)
.await // TODO: We need better way to handle execution response and errors
.map_err(|execution_error| ExecutionError {
inner: Error::from(execution_error.inner),
address: execution_error.address,
retries: execution_error.retries,
})?;
.map_err(|execution_error| execution_error.into())?;

let address = response.address.clone();
let retries = response.retries;
Expand Down Expand Up @@ -203,10 +198,7 @@ where
.dapi_client_settings
.override_by(settings.unwrap_or_default());

retry!(settings, fut)
.await
.map(|x| x.into_inner())
.map_err(|e| e.into_inner())
retry(settings, fut).await.into_inner()
}

/// Fetch single object from Platform.
Expand Down
19 changes: 10 additions & 9 deletions packages/rs-sdk/src/platform/fetch_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use crate::{
error::Error,
mock::MockResponse,
platform::{document_query::DocumentQuery, query::Query},
retry, Sdk,
sync::retry,
Sdk,
};
use dapi_grpc::platform::v0::{
GetContestedResourceIdentityVotesRequest, GetContestedResourceVoteStateRequest,
Expand Down Expand Up @@ -146,22 +147,22 @@ where
) -> Result<O, Error> {
let request = &query.query(sdk.prove())?;
let closure = |settings: RequestSettings| async move {
let request = request.clone();

let grpc_response = request
.clone()
.execute(sdk, settings)
.await
.map_err(|e| e.into())?;

let address = grpc_response.address.clone();
let retries = grpc_response.retries;
let response = grpc_response.into_inner();
let ExecutionResponse {
address,
retries,
inner: response,
} = grpc_response;

let object_type = std::any::type_name::<Self>().to_string();
tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform");

sdk.parse_proof::<<Self as FetchMany<K, O>>::Request, O>(request, response)
sdk.parse_proof::<<Self as FetchMany<K, O>>::Request, O>(request.clone(), response)
.await
.map(|o| ExecutionResponse {
inner: o,
Expand All @@ -177,7 +178,7 @@ where

let settings = sdk.dapi_client_settings;

retry!(settings, closure)
retry(settings, closure)
.await
.into_inner()
.map(|o| o.unwrap_or_default())
Expand Down Expand Up @@ -255,7 +256,7 @@ impl FetchMany<Identifier, Documents> for Document {
) -> Result<Documents, Error> {
let document_query: &DocumentQuery = &query.query(sdk.prove())?;

retry!(RequestSettings::default(), |settings| async move {
retry(RequestSettings::default(), |settings| async move {
let request = document_query.clone();
let result = request.execute(sdk, settings).await.map_err(|e| e.into())?;

Expand Down
4 changes: 2 additions & 2 deletions packages/rs-sdk/src/platform/fetch_unproved.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{types::evonode::EvoNode, Query};
use crate::mock::MockResponse;
use crate::Sdk;
use crate::{error::Error, retry};
use crate::{error::Error, sync::retry};
use dapi_grpc::platform::v0::{
self as platform_proto, GetStatusRequest, GetStatusResponse, ResponseMetadata,
};
Expand Down Expand Up @@ -107,7 +107,7 @@ where
};

let settings = sdk.dapi_client_settings.override_by(settings);
retry!(settings, closure).await.into_inner()
retry(settings, closure).await.into_inner()
}
}

Expand Down
120 changes: 50 additions & 70 deletions packages/rs-sdk/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,87 +94,68 @@ async fn worker<F: Future>(
Ok(())
}

/// Retry the provided code block using [`RetryFuture`].
/// Retry the provided closure.
///
/// This macro defines a variable `settings` that is visible within the $code block.
/// The $code block should use this variable when executing requests.
/// `$code` should return [`ExecutionResult`].
/// This function is used to retry async code. It takes into account number of retries already executed by lower
/// layers and stops retrying once the maximum number of retries is reached.
///
/// ## Troubleshooting
/// The `settings` should contain maximum number of retries that should be executed. In case of failure, total number of
/// requests sent is expected to be at least `settings.retries + 1` (initial request + `retries` configured in settings).
/// The actual number of requests sent can be higher, as the lower layers can retry the request multiple times.
///
/// Compiler error: `no method named retry found for closure`:
/// - ensure returned value is [`ExecutionResult`].,
/// - consider adding `.await` at the end of the closure.
/// `code` should be a `FnMut()` closure that returns a future that should be retried.
/// It takes [`RequestSettings`] as an argument and returns [`ExecutionResult`].
/// Retry mechanism can change [`RequestSettings`] between invocations of the `code` closure
/// to limit the number of retries for lower layers.
///
/// ## Parameters
///
/// - `settings` - global settings with any request-specific settings overrides applied.
/// - `code` - closure that returns a future that should be retried. It should take [`RequestSettings`] as
/// an argument and return [`ExecutionResult`].
///
/// ## Returns
///
/// Returns future that resolves to [`ExecutionResult`].
///
/// ## Example
///
/// ```rust
/// # use dash_sdk::RequestSettings;
/// # use dash_sdk::error::{Error,StaleNodeError};
/// # use rs_dapi_client::{ExecutionResult, ExecutionError};
/// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error> {
/// // do something
/// # unimplemented!()
/// Err(ExecutionError {
/// inner: Error::StaleNode(StaleNodeError::Height{
/// expected_height: 10,
/// received_height: 3,
/// tolerance_blocks: 1,
/// }),
/// retries: 0,
/// address: None,
/// })
/// }
/// #[tokio::main]
/// # async fn main() {
/// let global_settings = RequestSettings::default();
/// let closure = |settings| retry_test_function(settings);
/// dash_sdk::retry!(global_settings, closure).expect_err("should fail");
/// # }
/// async fn main() {
/// let global_settings = RequestSettings::default();
/// dash_sdk::sync::retry(global_settings, retry_test_function).await.expect_err("should fail");
/// }
/// ```
#[macro_export]
macro_rules! retry {
($settings:expr, $code:expr) => {{
let fut = move |s: rs_dapi_client::RequestSettings| {
let ss = s.clone();
$code(ss)
};
$crate::sync::do_retry(fut, $settings)
}};
}

// pub trait RetryFuture<R, E> {
// /// On error, retry the provided future `max_retries` times.
// ///
// /// This function is used to retry async functions. It takes into account number of retries already executed by lower
// /// layers and stops retrying if the maximum number of retries is reached.
// ///
// /// `Self` should be a closure that returns a future that should be retried. See below for more details.
// ///
// /// The `settings` contain maximum number of retries that should be executed. In case of failure, total number of
// /// requests sent is expected to be at least `max_retries + 1` (initial request + `max_retries` retries).
// /// It should contain global settings with any request-specific settings overrides applied.
// ///
// /// `settings` can be modified between retries; provided future SHOULD re-apply the settings on the beginning of each retry.
// ///
// /// Note that actual number of requests sent can be higher than specified in `settings`, as the retries on
// /// lower layers are not directly controlled by this function.
// ///
// /// ## Writing a retryable closure
// ///
// /// In order to allow multiple executions, this trait is implemented for closures that return a future to be executed.
// /// The closure should be `FnMut(RequestSettings) -> Future<Output = ExecutionResult<R, E>>`.
// /// The closure should be able to be called multiple times, with different settings each time.
// async fn retry(self, settings: RequestSettings) -> ExecutionResult<R, E>;
// }

// impl<Fut, FutureFn, R, E> RetryFuture<R, E> for FutureFn
// where
// Fut: Future<Output = ExecutionResult<R, E>>,
// FutureFn: FnMut(RequestSettings) -> Fut, // eg. FnMut() -> BoxFuture<'a, Result<ExecutionResponse<T>, ExecutionError<E>>>,
// E: CanRetry + Debug,
// {
// async fn retry(self, settings: RequestSettings) -> ExecutionResult<R, E> {
// do_retry(self, settings).await
// }
// }

/// Retry the provided future `max_retries` times.
///
/// See trait [`RetryFuture`] for more details.
pub async fn do_retry<Fut, FutureFn, R, E>(
f: FutureFn,
/// ## Troubleshooting
///
/// Compiler error: `no method named retry found for closure`:
/// - ensure returned value is [`ExecutionResult`].,
/// - consider adding `.await` at the end of the closure.
///
///
/// ## See also
///
/// - [`::backon`] crate that is used by this function.
pub async fn retry<Fut, FutureFn, R, E>(
settings: RequestSettings,
code: FutureFn,
) -> ExecutionResult<R, E>
where
Fut: Future<Output = ExecutionResult<R, E>>,
Expand All @@ -192,7 +173,7 @@ where
let settings = ArcSwap::new(Arc::new(settings));

// We need a mutex here, as `closure` must be FnMut()
let inner_fn = Arc::new(Mutex::new(f));
let inner_fn = Arc::new(Mutex::new(code));
let closure_settings = &settings;
let closure = move || {
let inner_fn = inner_fn.clone();
Expand Down Expand Up @@ -235,7 +216,6 @@ where
.await;

result.map_err(|mut e| {
// e.retries = retry_count.load(Ordering::Relaxed);
e.retries = retries;
e
})
Expand Down Expand Up @@ -360,7 +340,7 @@ mod test {
}

#[tokio::test]
async fn test_retry_macro() {
async fn test_retry() {
for _ in 0..5 {
REQUEST_COUNTER.store(0, Ordering::Relaxed);
let expected_requests: usize = (random::<u8>() % 100 + 1) as usize;
Expand All @@ -369,7 +349,7 @@ mod test {
global_settings.retries = Some(expected_requests - 1);

// let closure = |s| retry_test_function(s);
retry!(global_settings, retry_test_function)
retry(global_settings, retry_test_function)
.await
.expect_err("should fail");

Expand Down

0 comments on commit e4e667a

Please sign in to comment.