From ee8875df80dfdd9078787e19bfee0dbee76a784b Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 19 Jul 2021 17:39:11 -0700 Subject: [PATCH 1/7] Implement timeouts for LazyCachingCredentialsProvider --- aws/rust-runtime/aws-auth/Cargo.toml | 6 + aws/rust-runtime/aws-auth/src/provider.rs | 23 ++++ aws/rust-runtime/aws-auth/src/provider/env.rs | 2 + .../aws-auth/src/provider/lazy_caching.rs | 72 +++++++++--- .../aws-auth/src/provider/timeout.rs | 107 ++++++++++++++++++ aws/sdk/examples/sts/Cargo.toml | 2 +- .../sts/src/bin/credentials-provider.rs | 11 +- rust-runtime/smithy-http/Cargo.toml | 1 + rust-runtime/smithy-http/src/lib.rs | 1 + rust-runtime/smithy-http/src/sleep.rs | 35 ++++++ 10 files changed, 243 insertions(+), 17 deletions(-) create mode 100644 aws/rust-runtime/aws-auth/src/provider/timeout.rs create mode 100644 rust-runtime/smithy-http/src/sleep.rs diff --git a/aws/rust-runtime/aws-auth/Cargo.toml b/aws/rust-runtime/aws-auth/Cargo.toml index da29e6d42d..643a7e36c6 100644 --- a/aws/rust-runtime/aws-auth/Cargo.toml +++ b/aws/rust-runtime/aws-auth/Cargo.toml @@ -5,8 +5,13 @@ authors = ["AWS Rust SDK Team ", "Russell Cohen ), } @@ -28,6 +46,11 @@ impl Display for CredentialsError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { CredentialsError::CredentialsNotLoaded => write!(f, "CredentialsNotLoaded"), + CredentialsError::ProviderTimedOut(d) => write!( + f, + "Credentials provider timed out after {} seconds", + d.as_secs() + ), CredentialsError::Unhandled(err) => write!(f, "{}", err), } } diff --git a/aws/rust-runtime/aws-auth/src/provider/env.rs b/aws/rust-runtime/aws-auth/src/provider/env.rs index 784a1d6fb7..e58f70b4c1 100644 --- a/aws/rust-runtime/aws-auth/src/provider/env.rs +++ b/aws/rust-runtime/aws-auth/src/provider/env.rs @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0. */ +//! Credential provider implementation that pulls from environment variables + use crate::provider::{CredentialsError, ProvideCredentials}; use crate::Credentials; use std::collections::HashMap; diff --git a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs index 4bcbd65ee4..c516441d95 100644 --- a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs +++ b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs @@ -3,9 +3,14 @@ * SPDX-License-Identifier: Apache-2.0. */ +//! Lazy, caching, credentials provider implementation + use crate::provider::cache::Cache; use crate::provider::time::TimeSource; -use crate::provider::{AsyncProvideCredentials, BoxFuture, CredentialsResult}; +use crate::provider::timeout::Timeout; +use crate::provider::{ + AsyncProvideCredentials, AsyncSleep, BoxFuture, CredentialsError, CredentialsResult, +}; use std::sync::Arc; use std::time::Duration; use tracing::{trace_span, Instrument}; @@ -14,10 +19,7 @@ const DEFAULT_REFRESH_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_CREDENTIAL_EXPIRATION: Duration = Duration::from_secs(15 * 60); const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10); -// TODO: Implement async runtime-agnostic timeouts -// TODO: Add catch_unwind() to handle panics -// TODO: Update doc comment below once catch_unwind and timeouts are implemented -// TODO: Update warning not to use this in the STS example once it's prod ready +// TODO: Rename refresh to load /// `LazyCachingCredentialsProvider` implements [`AsyncProvideCredentials`] by caching /// credentials that it loads by calling a user-provided [`AsyncProvideCredentials`] implementation. @@ -25,21 +27,19 @@ const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10); /// For example, you can provide an [`AsyncProvideCredentials`] implementation that calls /// AWS STS's AssumeRole operation to get temporary credentials, and `LazyCachingCredentialsProvider` /// will cache those credentials until they expire. -/// -/// # Note -/// -/// This is __NOT__ production ready yet. Timeouts and panic safety have not been implemented yet. pub struct LazyCachingCredentialsProvider { time: Box, + sleep: Box, cache: Cache, refresh: Arc, - _refresh_timeout: Duration, + refresh_timeout: Duration, default_credential_expiration: Duration, } impl LazyCachingCredentialsProvider { fn new( time: impl TimeSource, + sleep: Box, refresh: Arc, refresh_timeout: Duration, default_credential_expiration: Duration, @@ -47,9 +47,10 @@ impl LazyCachingCredentialsProvider { ) -> Self { LazyCachingCredentialsProvider { time: Box::new(time), + sleep, cache: Cache::new(buffer_time), refresh, - _refresh_timeout: refresh_timeout, + refresh_timeout, default_credential_expiration, } } @@ -67,6 +68,8 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { { let now = self.time.now(); let refresh = self.refresh.clone(); + let timeout_future = self.sleep.sleep(self.refresh_timeout); + let refresh_timeout = self.refresh_timeout; let cache = self.cache.clone(); let default_credential_expiration = self.default_credential_expiration; @@ -80,11 +83,13 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { // since the futures are not eagerly executed, and the cache will only run one // of them. let span = trace_span!("lazy_refresh_credentials"); - let future = refresh.provide_credentials(); + let future = Timeout::new(refresh.provide_credentials(), timeout_future); cache .get_or_load(|| { async move { - let mut credentials = future.await?; + let mut credentials = future.await.map_err(|_| { + CredentialsError::ProviderTimedOut(refresh_timeout) + })??; // If the credentials don't have an expiration time, then create a default one if credentials.expiry().is_none() { *credentials.expiry_mut() = @@ -109,6 +114,7 @@ pub mod builder { }; use crate::provider::time::SystemTimeSource; use crate::provider::AsyncProvideCredentials; + use smithy_http::sleep::AsyncSleep; use std::sync::Arc; use std::time::Duration; @@ -122,8 +128,10 @@ pub mod builder { /// use aws_auth::provider::lazy_caching::LazyCachingCredentialsProvider; /// use std::sync::Arc; /// use std::time::Duration; + /// use smithy_http::sleep::TokioSleep; /// /// let provider = LazyCachingCredentialsProvider::builder() + /// .sleep_impl(TokioSleep::new()) /// .refresh(async_provide_credentials_fn(|| async { /// // An async process to retrieve credentials would go here: /// Ok(Credentials::from_keys("example", "example", None)) @@ -132,6 +140,7 @@ pub mod builder { /// ``` #[derive(Default)] pub struct Builder { + sleep: Option>, refresh: Option>, refresh_timeout: Option, buffer_time: Option, @@ -150,11 +159,20 @@ pub mod builder { self } + /// Implementation of [`AsyncSleep`] to use for timeouts. This enables use of + /// the `LazyCachingCredentialsProvider` with other async runtimes. + /// If using Tokio as the async runtime, this should be set to an instance of + /// [`TokioSleep`](smithy_http::sleep::TokioSleep). + pub fn sleep_impl(mut self, sleep: impl AsyncSleep + 'static) -> Self { + self.sleep = Some(Box::new(sleep)); + self + } + /// (Optional) Timeout for the given [`AsyncProvideCredentials`] implementation. /// Defaults to 5 seconds. pub fn refresh_timeout(mut self, timeout: Duration) -> Self { self.refresh_timeout = Some(timeout); - unimplemented!("refresh_timeout hasn't been implemented yet") + self } /// (Optional) Amount of time before the actual credential expiration time @@ -186,6 +204,7 @@ pub mod builder { ); LazyCachingCredentialsProvider::new( SystemTimeSource, + self.sleep.expect("sleep_impl is required"), self.refresh.expect("refresh provider is required"), self.refresh_timeout.unwrap_or(DEFAULT_REFRESH_TIMEOUT), self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME), @@ -205,6 +224,7 @@ mod tests { async_provide_credentials_fn, AsyncProvideCredentials, CredentialsError, CredentialsResult, }; use crate::Credentials; + use smithy_http::sleep::TokioSleep; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use tracing::info; @@ -239,6 +259,7 @@ mod tests { let refresh_list = Arc::new(Mutex::new(refresh_list)); LazyCachingCredentialsProvider::new( time, + Box::new(TokioSleep::new()), Arc::new(async_provide_credentials_fn(move || { let list = refresh_list.clone(); async move { @@ -278,6 +299,7 @@ mod tests { })); let provider = LazyCachingCredentialsProvider::new( time, + Box::new(TokioSleep::new()), refresh, DEFAULT_REFRESH_TIMEOUT, DEFAULT_CREDENTIAL_EXPIRATION, @@ -337,6 +359,7 @@ mod tests { #[test_env_log::test] fn refresh_retrieve_contention() { let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() .worker_threads(16) .build() .unwrap(); @@ -377,4 +400,25 @@ mod tests { } } } + + #[test_env_log::test(tokio::test)] + async fn load_timeout() { + let time = TestTime::new(epoch_secs(100)); + let provider = LazyCachingCredentialsProvider::new( + time, + Box::new(TokioSleep::new()), + Arc::new(async_provide_credentials_fn(|| async { + tokio::time::sleep(Duration::from_millis(10)).await; + Ok(credentials(1000)) + })), + Duration::from_millis(5), + DEFAULT_CREDENTIAL_EXPIRATION, + DEFAULT_BUFFER_TIME, + ); + + assert!(matches!( + provider.provide_credentials().await, + Err(CredentialsError::ProviderTimedOut(_)) + )); + } } diff --git a/aws/rust-runtime/aws-auth/src/provider/timeout.rs b/aws/rust-runtime/aws-auth/src/provider/timeout.rs new file mode 100644 index 0000000000..245b9bbb12 --- /dev/null +++ b/aws/rust-runtime/aws-auth/src/provider/timeout.rs @@ -0,0 +1,107 @@ +/* + * Original Copyright (c) 2021 Tokio Contributors. Licensed under the Apache-2.0 license. + * Modifications Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +use pin_project::pin_project; +use std::error::Error; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct TimedOutError; + +impl Error for TimedOutError {} + +impl fmt::Display for TimedOutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "timed out") + } +} + +#[pin_project] +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Timeout { + #[pin] + value: T, + #[pin] + sleep: S, +} + +impl Timeout { + pub(crate) fn new(value: T, sleep: S) -> Timeout { + Timeout { value, sleep } + } +} + +impl Future for Timeout +where + T: Future, + S: Future, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + // First, try polling the future + if let Poll::Ready(v) = me.value.poll(cx) { + return Poll::Ready(Ok(v)); + } + + // Now check the timer + match me.sleep.poll(cx) { + Poll::Ready(_) => Poll::Ready(Err(TimedOutError)), + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use super::Timeout; + use crate::provider::timeout::TimedOutError; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct Never; + impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + #[tokio::test] + async fn success() { + assert_eq!( + Ok(Ok(5)), + Timeout::new(async { Ok::(5) }, Never).await + ); + } + + #[tokio::test] + async fn failure() { + assert_eq!( + Ok(Err(0)), + Timeout::new(async { Err::(0) }, Never).await + ); + } + + #[tokio::test] + async fn timeout() { + assert_eq!(Err(TimedOutError), Timeout::new(Never, async {}).await); + } + + // If the value is available at the same time as the timeout, then return the value + #[tokio::test] + async fn prefer_value_to_timeout() { + assert_eq!(Ok(5), Timeout::new(async { 5 }, async {}).await); + } +} diff --git a/aws/sdk/examples/sts/Cargo.toml b/aws/sdk/examples/sts/Cargo.toml index e0a61dc752..723ebc819c 100644 --- a/aws/sdk/examples/sts/Cargo.toml +++ b/aws/sdk/examples/sts/Cargo.toml @@ -9,6 +9,6 @@ edition = "2018" [dependencies] sts = { package = "aws-sdk-sts", path = "../../build/aws-sdk/sts" } dynamodb = { package = "aws-sdk-dynamodb", path = "../../build/aws-sdk/dynamodb"} -aws-auth = { package = "aws-auth", path = "../../build/aws-sdk/aws-auth" } +aws-auth = { package = "aws-auth", path = "../../build/aws-sdk/aws-auth", features = ["timeout-tokio"] } tokio = { version = "1", features = ["full"] } tracing-subscriber = "0.2.18" diff --git a/aws/sdk/examples/sts/src/bin/credentials-provider.rs b/aws/sdk/examples/sts/src/bin/credentials-provider.rs index 58a7698e64..d005c68ec9 100644 --- a/aws/sdk/examples/sts/src/bin/credentials-provider.rs +++ b/aws/sdk/examples/sts/src/bin/credentials-provider.rs @@ -4,6 +4,7 @@ */ use aws_auth::provider::lazy_caching::LazyCachingCredentialsProvider; +use aws_auth::provider::TokioSleep; use aws_auth::provider::{async_provide_credentials_fn, CredentialsError}; use sts::Credentials; @@ -14,9 +15,15 @@ async fn main() -> Result<(), dynamodb::Error> { tracing_subscriber::fmt::init(); let client = sts::Client::from_env(); - // NOTE: Do not use LazyCachingCredentialsProvider in production yet! - // It hasn't implemented timeout or panic safety yet. + // `LazyCachingCredentialsProvider` will load credentials if it doesn't have any non-expired + // credentials cached. See the docs on the builder for the various configuration options, + // such as timeouts, default expiration times, and more. let sts_provider = LazyCachingCredentialsProvider::builder() + // `LazyCachingCredentialsProvider` requires an implementation of async sleep so that + // it can be used with different async runtimes. The sleep function will be used for timeouts. + // Since we're using Tokio in this example, we use the `TokioSleep` implementation, which + // requires the `timeout-tokio` feature in the `aws-auth` dependency. + .sleep_impl(TokioSleep::new()) .refresh(async_provide_credentials_fn(move || { let client = client.clone(); async move { diff --git a/rust-runtime/smithy-http/Cargo.toml b/rust-runtime/smithy-http/Cargo.toml index 78bb8dc9bb..d9703350b3 100644 --- a/rust-runtime/smithy-http/Cargo.toml +++ b/rust-runtime/smithy-http/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" [features] bytestream-util = ["tokio/fs", "tokio-util/io"] +sleep-tokio = ["tokio/time"] default = ["bytestream-util"] [dependencies] diff --git a/rust-runtime/smithy-http/src/lib.rs b/rust-runtime/smithy-http/src/lib.rs index 92837e3a16..18ce806184 100644 --- a/rust-runtime/smithy-http/src/lib.rs +++ b/rust-runtime/smithy-http/src/lib.rs @@ -18,4 +18,5 @@ pub mod query; pub mod response; pub mod result; pub mod retry; +pub mod sleep; mod urlencode; diff --git a/rust-runtime/smithy-http/src/sleep.rs b/rust-runtime/smithy-http/src/sleep.rs new file mode 100644 index 0000000000..37a2180bc6 --- /dev/null +++ b/rust-runtime/smithy-http/src/sleep.rs @@ -0,0 +1,35 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Provides an [`AsyncSleep`] trait that returns a future that sleeps for a given duration, +//! and implementations of `AsyncSleep` for different async runtimes. + +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +/// Async trait with a `sleep` function. +pub trait AsyncSleep: Send + Sync { + /// Returns a future that sleeps for the given `duration` of time. + fn sleep(&self, duration: Duration) -> Pin + Send + '_>>; +} + +/// Implementation of [`AsyncSleep`] for Tokio. +#[cfg(feature = "sleep-tokio")] +pub struct TokioSleep; + +#[cfg(feature = "sleep-tokio")] +impl TokioSleep { + pub fn new() -> TokioSleep { + TokioSleep + } +} + +#[cfg(feature = "sleep-tokio")] +impl AsyncSleep for TokioSleep { + fn sleep(&self, duration: Duration) -> Pin + Send + '_>> { + Box::pin(tokio::time::sleep(duration)) + } +} From c43456aafc03fc213d74d0786acda5b7a127e416 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Tue, 20 Jul 2021 15:41:22 -0700 Subject: [PATCH 2/7] Rename refresh to reload --- .../aws-auth/src/provider/lazy_caching.rs | 92 +++++++++---------- .../sts/src/bin/credentials-provider.rs | 4 +- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs index c516441d95..148239c497 100644 --- a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs +++ b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs @@ -15,12 +15,10 @@ use std::sync::Arc; use std::time::Duration; use tracing::{trace_span, Instrument}; -const DEFAULT_REFRESH_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_CREDENTIAL_EXPIRATION: Duration = Duration::from_secs(15 * 60); const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10); -// TODO: Rename refresh to load - /// `LazyCachingCredentialsProvider` implements [`AsyncProvideCredentials`] by caching /// credentials that it loads by calling a user-provided [`AsyncProvideCredentials`] implementation. /// @@ -29,28 +27,28 @@ const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10); /// will cache those credentials until they expire. pub struct LazyCachingCredentialsProvider { time: Box, - sleep: Box, + sleeper: Box, cache: Cache, - refresh: Arc, - refresh_timeout: Duration, + loader: Arc, + load_timeout: Duration, default_credential_expiration: Duration, } impl LazyCachingCredentialsProvider { fn new( time: impl TimeSource, - sleep: Box, - refresh: Arc, - refresh_timeout: Duration, + sleeper: Box, + loader: Arc, + load_timeout: Duration, default_credential_expiration: Duration, buffer_time: Duration, ) -> Self { LazyCachingCredentialsProvider { time: Box::new(time), - sleep, + sleeper, cache: Cache::new(buffer_time), - refresh, - refresh_timeout, + loader, + load_timeout, default_credential_expiration, } } @@ -67,9 +65,9 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { Self: 'a, { let now = self.time.now(); - let refresh = self.refresh.clone(); - let timeout_future = self.sleep.sleep(self.refresh_timeout); - let refresh_timeout = self.refresh_timeout; + let loader = self.loader.clone(); + let timeout_future = self.sleeper.sleep(self.load_timeout); + let load_timeout = self.load_timeout; let cache = self.cache.clone(); let default_credential_expiration = self.default_credential_expiration; @@ -78,18 +76,18 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { if let Some(credentials) = cache.yield_or_clear_if_expired(now).await { Ok(credentials) } else { - // If we didn't get credentials from the cache, then we need to try and refresh. - // There may be other threads also refreshing simultaneously, but this is OK + // If we didn't get credentials from the cache, then we need to try and load. + // There may be other threads also loading simultaneously, but this is OK // since the futures are not eagerly executed, and the cache will only run one // of them. - let span = trace_span!("lazy_refresh_credentials"); - let future = Timeout::new(refresh.provide_credentials(), timeout_future); + let span = trace_span!("lazy_load_credentials"); + let future = Timeout::new(loader.provide_credentials(), timeout_future); cache .get_or_load(|| { async move { - let mut credentials = future.await.map_err(|_| { - CredentialsError::ProviderTimedOut(refresh_timeout) - })??; + let mut credentials = future + .await + .map_err(|_| CredentialsError::ProviderTimedOut(load_timeout))??; // If the credentials don't have an expiration time, then create a default one if credentials.expiry().is_none() { *credentials.expiry_mut() = @@ -97,7 +95,7 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { } Ok(credentials) } - // Only instrument the the actual refreshing future so that no span + // Only instrument the the actual load future so that no span // is opened if the cache decides not to execute it. .instrument(span) }) @@ -110,7 +108,7 @@ impl AsyncProvideCredentials for LazyCachingCredentialsProvider { pub mod builder { use crate::provider::lazy_caching::{ LazyCachingCredentialsProvider, DEFAULT_BUFFER_TIME, DEFAULT_CREDENTIAL_EXPIRATION, - DEFAULT_REFRESH_TIMEOUT, + DEFAULT_LOAD_TIMEOUT, }; use crate::provider::time::SystemTimeSource; use crate::provider::AsyncProvideCredentials; @@ -131,8 +129,8 @@ pub mod builder { /// use smithy_http::sleep::TokioSleep; /// /// let provider = LazyCachingCredentialsProvider::builder() - /// .sleep_impl(TokioSleep::new()) - /// .refresh(async_provide_credentials_fn(|| async { + /// .sleeper(TokioSleep::new()) + /// .loader(async_provide_credentials_fn(|| async { /// // An async process to retrieve credentials would go here: /// Ok(Credentials::from_keys("example", "example", None)) /// })) @@ -141,8 +139,8 @@ pub mod builder { #[derive(Default)] pub struct Builder { sleep: Option>, - refresh: Option>, - refresh_timeout: Option, + loader: Option>, + load_timeout: Option, buffer_time: Option, default_credential_expiration: Option, } @@ -152,10 +150,10 @@ pub mod builder { Default::default() } - /// An implementation of [`AsyncProvideCredentials`] that will be used to refresh + /// An implementation of [`AsyncProvideCredentials`] that will be used to load /// the cached credentials once they're expired. - pub fn refresh(mut self, refresh: impl AsyncProvideCredentials + 'static) -> Self { - self.refresh = Some(Arc::new(refresh)); + pub fn loader(mut self, loader: impl AsyncProvideCredentials + 'static) -> Self { + self.loader = Some(Arc::new(loader)); self } @@ -163,15 +161,15 @@ pub mod builder { /// the `LazyCachingCredentialsProvider` with other async runtimes. /// If using Tokio as the async runtime, this should be set to an instance of /// [`TokioSleep`](smithy_http::sleep::TokioSleep). - pub fn sleep_impl(mut self, sleep: impl AsyncSleep + 'static) -> Self { + pub fn sleeper(mut self, sleep: impl AsyncSleep + 'static) -> Self { self.sleep = Some(Box::new(sleep)); self } /// (Optional) Timeout for the given [`AsyncProvideCredentials`] implementation. /// Defaults to 5 seconds. - pub fn refresh_timeout(mut self, timeout: Duration) -> Self { - self.refresh_timeout = Some(timeout); + pub fn load_timeout(mut self, timeout: Duration) -> Self { + self.load_timeout = Some(timeout); self } @@ -205,8 +203,8 @@ pub mod builder { LazyCachingCredentialsProvider::new( SystemTimeSource, self.sleep.expect("sleep_impl is required"), - self.refresh.expect("refresh provider is required"), - self.refresh_timeout.unwrap_or(DEFAULT_REFRESH_TIMEOUT), + self.loader.expect("loader is required"), + self.load_timeout.unwrap_or(DEFAULT_LOAD_TIMEOUT), self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME), default_credential_expiration, ) @@ -218,7 +216,7 @@ pub mod builder { mod tests { use crate::provider::lazy_caching::{ LazyCachingCredentialsProvider, TimeSource, DEFAULT_BUFFER_TIME, - DEFAULT_CREDENTIAL_EXPIRATION, DEFAULT_REFRESH_TIMEOUT, + DEFAULT_CREDENTIAL_EXPIRATION, DEFAULT_LOAD_TIMEOUT, }; use crate::provider::{ async_provide_credentials_fn, AsyncProvideCredentials, CredentialsError, CredentialsResult, @@ -254,21 +252,21 @@ mod tests { fn test_provider( time: T, - refresh_list: Vec, + load_list: Vec, ) -> LazyCachingCredentialsProvider { - let refresh_list = Arc::new(Mutex::new(refresh_list)); + let load_list = Arc::new(Mutex::new(load_list)); LazyCachingCredentialsProvider::new( time, Box::new(TokioSleep::new()), Arc::new(async_provide_credentials_fn(move || { - let list = refresh_list.clone(); + let list = load_list.clone(); async move { let next = list.lock().unwrap().remove(0); info!("refreshing the credentials to {:?}", next); next } })), - DEFAULT_REFRESH_TIMEOUT, + DEFAULT_LOAD_TIMEOUT, DEFAULT_CREDENTIAL_EXPIRATION, DEFAULT_BUFFER_TIME, ) @@ -293,15 +291,15 @@ mod tests { #[test_env_log::test(tokio::test)] async fn initial_populate_credentials() { let time = TestTime::new(epoch_secs(100)); - let refresh = Arc::new(async_provide_credentials_fn(|| async { + let loader = Arc::new(async_provide_credentials_fn(|| async { info!("refreshing the credentials"); Ok(credentials(1000)) })); let provider = LazyCachingCredentialsProvider::new( time, Box::new(TokioSleep::new()), - refresh, - DEFAULT_REFRESH_TIMEOUT, + loader, + DEFAULT_LOAD_TIMEOUT, DEFAULT_CREDENTIAL_EXPIRATION, DEFAULT_BUFFER_TIME, ); @@ -317,7 +315,7 @@ mod tests { } #[test_env_log::test(tokio::test)] - async fn refresh_expired_credentials() { + async fn reload_expired_credentials() { let time = TestTime::new(epoch_secs(100)); let time_inner = time.time.clone(); let provider = test_provider( @@ -340,7 +338,7 @@ mod tests { } #[test_env_log::test(tokio::test)] - async fn refresh_failed_error() { + async fn load_failed_error() { let time = TestTime::new(epoch_secs(100)); let time_inner = time.time.clone(); let provider = test_provider( @@ -357,7 +355,7 @@ mod tests { } #[test_env_log::test] - fn refresh_retrieve_contention() { + fn load_contention() { let rt = tokio::runtime::Builder::new_multi_thread() .enable_time() .worker_threads(16) diff --git a/aws/sdk/examples/sts/src/bin/credentials-provider.rs b/aws/sdk/examples/sts/src/bin/credentials-provider.rs index d005c68ec9..b60c64dd04 100644 --- a/aws/sdk/examples/sts/src/bin/credentials-provider.rs +++ b/aws/sdk/examples/sts/src/bin/credentials-provider.rs @@ -23,8 +23,8 @@ async fn main() -> Result<(), dynamodb::Error> { // it can be used with different async runtimes. The sleep function will be used for timeouts. // Since we're using Tokio in this example, we use the `TokioSleep` implementation, which // requires the `timeout-tokio` feature in the `aws-auth` dependency. - .sleep_impl(TokioSleep::new()) - .refresh(async_provide_credentials_fn(move || { + .sleeper(TokioSleep::new()) + .loader(async_provide_credentials_fn(move || { let client = client.clone(); async move { let session_token = client From 49ff14e5d02116b3464a6a1022bd71ffb810d707 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Tue, 20 Jul 2021 15:58:59 -0700 Subject: [PATCH 3/7] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index be5e7de3a9..e90b79e098 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## vNext (Month Day Year) **New This Week** +- :tada: Add LazyCachingCredentialsProvider to aws-auth for use with expiring credentials, such as STS AssumeRole. Update STS example to use this new provider (#578, #595) - :bug: Bugfix: Fix parsing bug where whitespace was stripped when parsing XML (#590) ## v0.15 (June 29th 2021) From 4c6d36d61904016efde88e8acf79c0ac6a51a000 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Tue, 20 Jul 2021 16:44:45 -0700 Subject: [PATCH 4/7] Fix clippy --- rust-runtime/smithy-http/src/sleep.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust-runtime/smithy-http/src/sleep.rs b/rust-runtime/smithy-http/src/sleep.rs index 37a2180bc6..9d3ec1e052 100644 --- a/rust-runtime/smithy-http/src/sleep.rs +++ b/rust-runtime/smithy-http/src/sleep.rs @@ -18,12 +18,13 @@ pub trait AsyncSleep: Send + Sync { /// Implementation of [`AsyncSleep`] for Tokio. #[cfg(feature = "sleep-tokio")] +#[derive(Default)] pub struct TokioSleep; #[cfg(feature = "sleep-tokio")] impl TokioSleep { pub fn new() -> TokioSleep { - TokioSleep + Default::default() } } From f2ed3312c2654b6013bca7b772ca0f72338a8fb3 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Thu, 22 Jul 2021 10:57:08 -0700 Subject: [PATCH 5/7] CR feedback --- aws/rust-runtime/aws-auth/Cargo.toml | 7 +- aws/rust-runtime/aws-auth/src/provider.rs | 6 -- .../aws-auth/src/provider/lazy_caching.rs | 29 ++++---- aws/sdk/build.gradle.kts | 1 + aws/sdk/examples/sts/Cargo.toml | 2 +- .../sts/src/bin/credentials-provider.rs | 8 +- rust-runtime/smithy-async/Cargo.toml | 16 ++++ rust-runtime/smithy-async/src/future/mod.rs | 9 +++ rust-runtime/smithy-async/src/future/never.rs | 29 ++++++++ .../smithy-async/src/future}/timeout.rs | 73 ++++++++++++------- rust-runtime/smithy-async/src/lib.rs | 12 +++ rust-runtime/smithy-async/src/rt/mod.rs | 8 ++ rust-runtime/smithy-async/src/rt/sleep.rs | 72 ++++++++++++++++++ rust-runtime/smithy-http/Cargo.toml | 1 - rust-runtime/smithy-http/src/lib.rs | 1 - rust-runtime/smithy-http/src/sleep.rs | 36 --------- 16 files changed, 214 insertions(+), 96 deletions(-) create mode 100644 rust-runtime/smithy-async/Cargo.toml create mode 100644 rust-runtime/smithy-async/src/future/mod.rs create mode 100644 rust-runtime/smithy-async/src/future/never.rs rename {aws/rust-runtime/aws-auth/src/provider => rust-runtime/smithy-async/src/future}/timeout.rs (50%) create mode 100644 rust-runtime/smithy-async/src/lib.rs create mode 100644 rust-runtime/smithy-async/src/rt/mod.rs create mode 100644 rust-runtime/smithy-async/src/rt/sleep.rs delete mode 100644 rust-runtime/smithy-http/src/sleep.rs diff --git a/aws/rust-runtime/aws-auth/Cargo.toml b/aws/rust-runtime/aws-auth/Cargo.toml index 643a7e36c6..63ab3c0239 100644 --- a/aws/rust-runtime/aws-auth/Cargo.toml +++ b/aws/rust-runtime/aws-auth/Cargo.toml @@ -6,12 +6,13 @@ license = "Apache-2.0" edition = "2018" [features] -timeout-tokio = ["smithy-http/sleep-tokio"] -default = [] +rt-tokio = ["smithy-async/rt-tokio"] +default = ["rt-tokio"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] pin-project = "1" +smithy-async = { path = "../../../rust-runtime/smithy-async", default-features = false } smithy-http = { path = "../../../rust-runtime/smithy-http" } tokio = { version = "1", features = ["sync"] } tracing = "0.1.25" @@ -24,4 +25,4 @@ http = "0.2.3" test-env-log = { version = "0.2.7", features = ["trace"] } tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "test-util"] } tracing-subscriber = { version = "0.2.16", features = ["fmt"] } -smithy-http = { path = "../../../rust-runtime/smithy-http", features = ["sleep-tokio"] } +smithy-async = { path = "../../../rust-runtime/smithy-async", features = ["rt-tokio"] } diff --git a/aws/rust-runtime/aws-auth/src/provider.rs b/aws/rust-runtime/aws-auth/src/provider.rs index 181dc03451..49d4692a40 100644 --- a/aws/rust-runtime/aws-auth/src/provider.rs +++ b/aws/rust-runtime/aws-auth/src/provider.rs @@ -17,7 +17,6 @@ mod cache; pub mod env; pub mod lazy_caching; mod time; -mod timeout; use crate::Credentials; use smithy_http::property_bag::PropertyBag; @@ -29,11 +28,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -// Re-export AsyncSleep and TokioSleep so that an explicit dependency on smithy-http isn't needed -pub use smithy_http::sleep::AsyncSleep; -#[cfg(feature = "timeout-tokio")] -pub use smithy_http::sleep::TokioSleep; - #[derive(Debug)] #[non_exhaustive] pub enum CredentialsError { diff --git a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs index 148239c497..f1c69ad367 100644 --- a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs +++ b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs @@ -7,10 +7,9 @@ use crate::provider::cache::Cache; use crate::provider::time::TimeSource; -use crate::provider::timeout::Timeout; -use crate::provider::{ - AsyncProvideCredentials, AsyncSleep, BoxFuture, CredentialsError, CredentialsResult, -}; +use crate::provider::{AsyncProvideCredentials, BoxFuture, CredentialsError, CredentialsResult}; +use smithy_async::future::timeout::Timeout; +use smithy_async::rt::sleep::AsyncSleep; use std::sync::Arc; use std::time::Duration; use tracing::{trace_span, Instrument}; @@ -112,7 +111,7 @@ pub mod builder { }; use crate::provider::time::SystemTimeSource; use crate::provider::AsyncProvideCredentials; - use smithy_http::sleep::AsyncSleep; + use smithy_async::rt::sleep::{default_async_sleep, AsyncSleep}; use std::sync::Arc; use std::time::Duration; @@ -126,11 +125,9 @@ pub mod builder { /// use aws_auth::provider::lazy_caching::LazyCachingCredentialsProvider; /// use std::sync::Arc; /// use std::time::Duration; - /// use smithy_http::sleep::TokioSleep; /// /// let provider = LazyCachingCredentialsProvider::builder() - /// .sleeper(TokioSleep::new()) - /// .loader(async_provide_credentials_fn(|| async { + /// .load(async_provide_credentials_fn(|| async { /// // An async process to retrieve credentials would go here: /// Ok(Credentials::from_keys("example", "example", None)) /// })) @@ -139,7 +136,7 @@ pub mod builder { #[derive(Default)] pub struct Builder { sleep: Option>, - loader: Option>, + load: Option>, load_timeout: Option, buffer_time: Option, default_credential_expiration: Option, @@ -152,8 +149,8 @@ pub mod builder { /// An implementation of [`AsyncProvideCredentials`] that will be used to load /// the cached credentials once they're expired. - pub fn loader(mut self, loader: impl AsyncProvideCredentials + 'static) -> Self { - self.loader = Some(Arc::new(loader)); + pub fn load(mut self, loader: impl AsyncProvideCredentials + 'static) -> Self { + self.load = Some(Arc::new(loader)); self } @@ -161,7 +158,7 @@ pub mod builder { /// the `LazyCachingCredentialsProvider` with other async runtimes. /// If using Tokio as the async runtime, this should be set to an instance of /// [`TokioSleep`](smithy_http::sleep::TokioSleep). - pub fn sleeper(mut self, sleep: impl AsyncSleep + 'static) -> Self { + pub fn sleep(mut self, sleep: impl AsyncSleep + 'static) -> Self { self.sleep = Some(Box::new(sleep)); self } @@ -202,8 +199,10 @@ pub mod builder { ); LazyCachingCredentialsProvider::new( SystemTimeSource, - self.sleep.expect("sleep_impl is required"), - self.loader.expect("loader is required"), + self.sleep.unwrap_or_else(|| { + default_async_sleep().expect("no default sleep implementation available") + }), + self.load.expect("load implementation is required"), self.load_timeout.unwrap_or(DEFAULT_LOAD_TIMEOUT), self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME), default_credential_expiration, @@ -222,7 +221,7 @@ mod tests { async_provide_credentials_fn, AsyncProvideCredentials, CredentialsError, CredentialsResult, }; use crate::Credentials; - use smithy_http::sleep::TokioSleep; + use smithy_async::rt::sleep::TokioSleep; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use tracing::info; diff --git a/aws/sdk/build.gradle.kts b/aws/sdk/build.gradle.kts index aea1832ebf..71a8d84125 100644 --- a/aws/sdk/build.gradle.kts +++ b/aws/sdk/build.gradle.kts @@ -20,6 +20,7 @@ val smithyVersion: String by project val sdkOutputDir = buildDir.resolve("aws-sdk") val runtimeModules = listOf( + "smithy-async", "smithy-types", "smithy-json", "smithy-query", diff --git a/aws/sdk/examples/sts/Cargo.toml b/aws/sdk/examples/sts/Cargo.toml index 723ebc819c..e0a61dc752 100644 --- a/aws/sdk/examples/sts/Cargo.toml +++ b/aws/sdk/examples/sts/Cargo.toml @@ -9,6 +9,6 @@ edition = "2018" [dependencies] sts = { package = "aws-sdk-sts", path = "../../build/aws-sdk/sts" } dynamodb = { package = "aws-sdk-dynamodb", path = "../../build/aws-sdk/dynamodb"} -aws-auth = { package = "aws-auth", path = "../../build/aws-sdk/aws-auth", features = ["timeout-tokio"] } +aws-auth = { package = "aws-auth", path = "../../build/aws-sdk/aws-auth" } tokio = { version = "1", features = ["full"] } tracing-subscriber = "0.2.18" diff --git a/aws/sdk/examples/sts/src/bin/credentials-provider.rs b/aws/sdk/examples/sts/src/bin/credentials-provider.rs index b60c64dd04..79cfd03bcc 100644 --- a/aws/sdk/examples/sts/src/bin/credentials-provider.rs +++ b/aws/sdk/examples/sts/src/bin/credentials-provider.rs @@ -4,7 +4,6 @@ */ use aws_auth::provider::lazy_caching::LazyCachingCredentialsProvider; -use aws_auth::provider::TokioSleep; use aws_auth::provider::{async_provide_credentials_fn, CredentialsError}; use sts::Credentials; @@ -19,12 +18,7 @@ async fn main() -> Result<(), dynamodb::Error> { // credentials cached. See the docs on the builder for the various configuration options, // such as timeouts, default expiration times, and more. let sts_provider = LazyCachingCredentialsProvider::builder() - // `LazyCachingCredentialsProvider` requires an implementation of async sleep so that - // it can be used with different async runtimes. The sleep function will be used for timeouts. - // Since we're using Tokio in this example, we use the `TokioSleep` implementation, which - // requires the `timeout-tokio` feature in the `aws-auth` dependency. - .sleeper(TokioSleep::new()) - .loader(async_provide_credentials_fn(move || { + .load(async_provide_credentials_fn(move || { let client = client.clone(); async move { let session_token = client diff --git a/rust-runtime/smithy-async/Cargo.toml b/rust-runtime/smithy-async/Cargo.toml new file mode 100644 index 0000000000..1a8ca1e901 --- /dev/null +++ b/rust-runtime/smithy-async/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "smithy-async" +version = "0.1.0" +authors = ["AWS Rust SDK Team ", "John DiSanti "] +edition = "2018" + +[features] +rt-tokio = ["tokio"] +default = ["rt-tokio"] + +[dependencies] +pin-project-lite = "0.2" +tokio = { version = "1.6", features = ["time"], optional = true } + +[dev-dependencies] +tokio = { version = "1.6", features = ["rt", "macros"] } diff --git a/rust-runtime/smithy-async/src/future/mod.rs b/rust-runtime/smithy-async/src/future/mod.rs new file mode 100644 index 0000000000..6f06b1f376 --- /dev/null +++ b/rust-runtime/smithy-async/src/future/mod.rs @@ -0,0 +1,9 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Useful runtime-agnostic future implementations. + +pub mod never; +pub mod timeout; diff --git a/rust-runtime/smithy-async/src/future/never.rs b/rust-runtime/smithy-async/src/future/never.rs new file mode 100644 index 0000000000..1454f71174 --- /dev/null +++ b/rust-runtime/smithy-async/src/future/never.rs @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Provides the [`Never`] future that never completes. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Future that never completes. +#[non_exhaustive] +#[derive(Default)] +pub struct Never; + +impl Never { + pub fn new() -> Never { + Default::default() + } +} + +impl Future for Never { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } +} diff --git a/aws/rust-runtime/aws-auth/src/provider/timeout.rs b/rust-runtime/smithy-async/src/future/timeout.rs similarity index 50% rename from aws/rust-runtime/aws-auth/src/provider/timeout.rs rename to rust-runtime/smithy-async/src/future/timeout.rs index 245b9bbb12..0da0dfbff2 100644 --- a/aws/rust-runtime/aws-auth/src/provider/timeout.rs +++ b/rust-runtime/smithy-async/src/future/timeout.rs @@ -1,10 +1,41 @@ /* - * Original Copyright (c) 2021 Tokio Contributors. Licensed under the Apache-2.0 license. - * Modifications Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ -use pin_project::pin_project; +// This code was copied and then modified from Tokio. + +/* + * Copyright (c) 2021 Tokio Contributors + * + * Permission is hereby granted, free of charge, to any + * person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the + * Software without restriction, including without + * limitation the rights to use, copy, modify, merge, + * publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software + * is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice + * shall be included in all copies or substantial portions + * of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF + * ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED + * TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT + * SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR + * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +//! Provides the [`Timeout`] future for adding a timeout to another future. + +use pin_project_lite::pin_project; use std::error::Error; use std::fmt; use std::future::Future; @@ -22,18 +53,20 @@ impl fmt::Display for TimedOutError { } } -#[pin_project] -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] -pub struct Timeout { - #[pin] - value: T, - #[pin] - sleep: S, +pin_project! { + #[non_exhaustive] + #[must_use = "futures do nothing unless you `.await` or poll them"] + #[derive(Debug)] + pub struct Timeout { + #[pin] + value: T, + #[pin] + sleep: S, + } } impl Timeout { - pub(crate) fn new(value: T, sleep: S) -> Timeout { + pub fn new(value: T, sleep: S) -> Timeout { Timeout { value, sleep } } } @@ -63,20 +96,8 @@ where #[cfg(test)] mod tests { - use super::Timeout; - use crate::provider::timeout::TimedOutError; - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - struct Never; - impl Future for Never { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - Poll::Pending - } - } + use super::{TimedOutError, Timeout}; + use crate::future::never::Never; #[tokio::test] async fn success() { diff --git a/rust-runtime/smithy-async/src/lib.rs b/rust-runtime/smithy-async/src/lib.rs new file mode 100644 index 0000000000..f6a8dfb5c1 --- /dev/null +++ b/rust-runtime/smithy-async/src/lib.rs @@ -0,0 +1,12 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Future utilities and runtime-agnostic abstractions for smithy-rs. +//! +//! Async runtime specific code is abstracted behind async traits, and implementations are +//! provided via feature flag. For now, only Tokio runtime implementations are provided. + +pub mod future; +pub mod rt; diff --git a/rust-runtime/smithy-async/src/rt/mod.rs b/rust-runtime/smithy-async/src/rt/mod.rs new file mode 100644 index 0000000000..a8966e1610 --- /dev/null +++ b/rust-runtime/smithy-async/src/rt/mod.rs @@ -0,0 +1,8 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Async runtime agnostic traits and implementations. + +pub mod sleep; diff --git a/rust-runtime/smithy-async/src/rt/sleep.rs b/rust-runtime/smithy-async/src/rt/sleep.rs new file mode 100644 index 0000000000..4ac7dceb82 --- /dev/null +++ b/rust-runtime/smithy-async/src/rt/sleep.rs @@ -0,0 +1,72 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +//! Provides an [`AsyncSleep`] trait that returns a future that sleeps for a given duration, +//! and implementations of `AsyncSleep` for different async runtimes. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +/// Async trait with a `sleep` function. +pub trait AsyncSleep: std::fmt::Debug + Send + Sync { + /// Returns a future that sleeps for the given `duration` of time. + fn sleep(&self, duration: Duration) -> Sleep; +} + +/// Returns a default sleep implementation based on the features enabled, or `None` if +/// there isn't one available from this crate. +pub fn default_async_sleep() -> Option> { + sleep_tokio() +} + +/// Future returned by [`AsyncSleep`]. +#[non_exhaustive] +pub struct Sleep(Pin + Send + 'static>>); + +impl Sleep { + fn new(future: impl Future + Send + 'static) -> Sleep { + Sleep(Box::pin(future)) + } +} + +impl Future for Sleep { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.as_mut().poll(cx) + } +} + +/// Implementation of [`AsyncSleep`] for Tokio. +#[non_exhaustive] +#[cfg(feature = "rt-tokio")] +#[derive(Debug, Default)] +pub struct TokioSleep; + +#[cfg(feature = "rt-tokio")] +impl TokioSleep { + pub fn new() -> TokioSleep { + Default::default() + } +} + +#[cfg(feature = "rt-tokio")] +impl AsyncSleep for TokioSleep { + fn sleep(&self, duration: Duration) -> Sleep { + Sleep::new(tokio::time::sleep(duration)) + } +} + +#[cfg(feature = "rt-tokio")] +fn sleep_tokio() -> Option> { + Some(Box::new(TokioSleep::new())) +} + +#[cfg(not(feature = "rt-tokio"))] +fn sleep_tokio() -> Option> { + None +} diff --git a/rust-runtime/smithy-http/Cargo.toml b/rust-runtime/smithy-http/Cargo.toml index d9703350b3..78bb8dc9bb 100644 --- a/rust-runtime/smithy-http/Cargo.toml +++ b/rust-runtime/smithy-http/Cargo.toml @@ -7,7 +7,6 @@ license = "Apache-2.0" [features] bytestream-util = ["tokio/fs", "tokio-util/io"] -sleep-tokio = ["tokio/time"] default = ["bytestream-util"] [dependencies] diff --git a/rust-runtime/smithy-http/src/lib.rs b/rust-runtime/smithy-http/src/lib.rs index 18ce806184..92837e3a16 100644 --- a/rust-runtime/smithy-http/src/lib.rs +++ b/rust-runtime/smithy-http/src/lib.rs @@ -18,5 +18,4 @@ pub mod query; pub mod response; pub mod result; pub mod retry; -pub mod sleep; mod urlencode; diff --git a/rust-runtime/smithy-http/src/sleep.rs b/rust-runtime/smithy-http/src/sleep.rs deleted file mode 100644 index 9d3ec1e052..0000000000 --- a/rust-runtime/smithy-http/src/sleep.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ - -//! Provides an [`AsyncSleep`] trait that returns a future that sleeps for a given duration, -//! and implementations of `AsyncSleep` for different async runtimes. - -use std::future::Future; -use std::pin::Pin; -use std::time::Duration; - -/// Async trait with a `sleep` function. -pub trait AsyncSleep: Send + Sync { - /// Returns a future that sleeps for the given `duration` of time. - fn sleep(&self, duration: Duration) -> Pin + Send + '_>>; -} - -/// Implementation of [`AsyncSleep`] for Tokio. -#[cfg(feature = "sleep-tokio")] -#[derive(Default)] -pub struct TokioSleep; - -#[cfg(feature = "sleep-tokio")] -impl TokioSleep { - pub fn new() -> TokioSleep { - Default::default() - } -} - -#[cfg(feature = "sleep-tokio")] -impl AsyncSleep for TokioSleep { - fn sleep(&self, duration: Duration) -> Pin + Send + '_>> { - Box::pin(tokio::time::sleep(duration)) - } -} From e4a27e0d647fe0168cd438efc7245d89acecb0c3 Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 2 Aug 2021 10:51:59 -0700 Subject: [PATCH 6/7] Add note about panic on `LazyCachedCredentialsProvider` builder --- aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs index f1c69ad367..50f59664af 100644 --- a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs +++ b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs @@ -189,6 +189,11 @@ pub mod builder { } /// Creates the [`LazyCachingCredentialsProvider`]. + /// + /// ## Note: + /// This will panic if no `sleep` implementation is given and if no default crate features + /// are used. By default, the [`TokioSleep`](smithy_http::sleep::TokioSleep) + /// implementation will be set automatically. pub fn build(self) -> LazyCachingCredentialsProvider { let default_credential_expiration = self .default_credential_expiration From 85ee3cb97fc53a344da4aac6294b9173ded2da2c Mon Sep 17 00:00:00 2001 From: John DiSanti Date: Mon, 2 Aug 2021 14:29:56 -0700 Subject: [PATCH 7/7] Fix doc comment code reference --- aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs index 50f59664af..f711437c94 100644 --- a/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs +++ b/aws/rust-runtime/aws-auth/src/provider/lazy_caching.rs @@ -157,7 +157,7 @@ pub mod builder { /// Implementation of [`AsyncSleep`] to use for timeouts. This enables use of /// the `LazyCachingCredentialsProvider` with other async runtimes. /// If using Tokio as the async runtime, this should be set to an instance of - /// [`TokioSleep`](smithy_http::sleep::TokioSleep). + /// [`TokioSleep`](smithy_async::rt::sleep::TokioSleep). pub fn sleep(mut self, sleep: impl AsyncSleep + 'static) -> Self { self.sleep = Some(Box::new(sleep)); self @@ -192,7 +192,7 @@ pub mod builder { /// /// ## Note: /// This will panic if no `sleep` implementation is given and if no default crate features - /// are used. By default, the [`TokioSleep`](smithy_http::sleep::TokioSleep) + /// are used. By default, the [`TokioSleep`](smithy_async::rt::sleep::TokioSleep) /// implementation will be set automatically. pub fn build(self) -> LazyCachingCredentialsProvider { let default_credential_expiration = self