Skip to content

Commit

Permalink
Integrate TimeSource into the orchestrator and middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
rcoh committed May 25, 2023
1 parent 7ccac06 commit d0a8f56
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 63 deletions.
11 changes: 11 additions & 0 deletions aws/rust-runtime/aws-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ mod loader {
use aws_credential_types::cache::CredentialsCache;
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep};
use aws_smithy_async::time::{SharedTimeSource, TimeSource};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::timeout::TimeoutConfig;
Expand Down Expand Up @@ -192,6 +193,7 @@ mod loader {
profile_files_override: Option<ProfileFiles>,
use_fips: Option<bool>,
use_dual_stack: Option<bool>,
time_source: Option<SharedTimeSource>,
}

impl ConfigLoader {
Expand Down Expand Up @@ -262,6 +264,12 @@ mod loader {
self
}

/// Set the time source used for tasks like signing requests
pub fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
self.time_source = Some(SharedTimeSource::new(time_source));
self
}

/// Override the [`HttpConnector`] for this [`ConfigLoader`]. The connector will be used when
/// sending operations. This **does not set** the HTTP connector used by config providers.
/// To change that connector, use [ConfigLoader::configure].
Expand Down Expand Up @@ -588,12 +596,15 @@ mod loader {
SharedCredentialsProvider::new(builder.build().await)
};

let ts = self.time_source.unwrap_or_default();

let mut builder = SdkConfig::builder()
.region(region)
.retry_config(retry_config)
.timeout_config(timeout_config)
.credentials_cache(credentials_cache)
.credentials_provider(credentials_provider)
.time_source(ts)
.http_connector(http_connector);

builder.set_app_name(app_name);
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-runtime/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ pub mod sigv4 {
) -> Result<(), BoxError> {
let operation_config =
Self::extract_operation_config(auth_scheme_endpoint_config, config_bag)?;
let request_time = config_bag.request_time().unwrap_or_default().system_time();
let request_time = config_bag.request_time().unwrap_or_default().now();

let credentials = if let Some(creds) = identity.data::<Credentials>() {
creds
Expand Down
4 changes: 3 additions & 1 deletion aws/rust-runtime/aws-sig-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ aws-credential-types = { path = "../aws-credential-types" }
aws-sigv4 = { path = "../aws-sigv4" }
aws-smithy-eventstream = { path = "../../../rust-runtime/aws-smithy-eventstream", optional = true }
aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" }
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" }
aws-types = { path = "../aws-types" }
http = "0.2.2"
tracing = "0.1"

[dev-dependencies]
aws-credential-types = { path = "../aws-credential-types", features = ["test-util"] }
aws-endpoint = { path = "../aws-endpoint" }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types"}
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" }
tracing-test = "0.2.1"
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async", features = ["test-util"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
14 changes: 9 additions & 5 deletions aws/rust-runtime/aws-sig-auth/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

use std::error::Error;
use std::fmt::{Display, Formatter};
use std::time::SystemTime;

use aws_smithy_http::middleware::MapRequest;
use aws_smithy_http::operation::Request;
use aws_smithy_http::property_bag::PropertyBag;

use aws_credential_types::Credentials;
use aws_sigv4::http_request::SignableBody;
use aws_smithy_async::time::SharedTimeSource;
use aws_types::region::SigningRegion;
use aws_types::SigningService;

Expand Down Expand Up @@ -145,9 +145,10 @@ fn signing_config(
let payload_override = config.get::<SignableBody<'static>>();
let request_config = RequestConfig {
request_ts: config
.get::<SystemTime>()
.copied()
.unwrap_or_else(SystemTime::now),
.get::<SharedTimeSource>()
.map(|t| t.now())
// TODO(enableNewSmithyRuntime): Remove this fallback
.unwrap_or_else(|| SharedTimeSource::default().now()),
region,
payload_override,
service: signing_service,
Expand Down Expand Up @@ -199,6 +200,7 @@ mod test {

use aws_credential_types::Credentials;
use aws_endpoint::AwsAuthStage;
use aws_smithy_async::time::SharedTimeSource;
use aws_types::region::{Region, SigningRegion};
use aws_types::SigningService;

Expand Down Expand Up @@ -249,7 +251,9 @@ mod test {
let req = operation::Request::new(req)
.augment(|req, conf| {
conf.insert(region.clone());
conf.insert(UNIX_EPOCH + Duration::new(1611160427, 0));
conf.insert(SharedTimeSource::new(
UNIX_EPOCH + Duration::new(1611160427, 0),
));
conf.insert(SigningService::from_static("kinesis"));
conf.insert(endpoint);
Result::<_, Infallible>::Ok(req)
Expand Down
23 changes: 23 additions & 0 deletions aws/rust-runtime/aws-types/src/sdk_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::Arc;
use aws_credential_types::cache::CredentialsCache;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_smithy_async::rt::sleep::AsyncSleep;
use aws_smithy_async::time::{SharedTimeSource, TimeSource};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::timeout::TimeoutConfig;
Expand All @@ -40,6 +41,8 @@ If no dual-stack endpoint is available the request MAY return an error.
**Note**: Some services do not offer dual-stack as a configurable parameter (e.g. Code Catalyst). For
these services, this setting has no effect"
};

(time_source) => { "The time source use to use for this client. This only needs to be required for creating deterministic tests or platforms where `SystemTime::now()` is not supported." };
}
}

Expand All @@ -53,6 +56,7 @@ pub struct SdkConfig {
endpoint_url: Option<String>,
retry_config: Option<RetryConfig>,
sleep_impl: Option<Arc<dyn AsyncSleep>>,
time_source: Option<SharedTimeSource>,
timeout_config: Option<TimeoutConfig>,
http_connector: Option<HttpConnector>,
use_fips: Option<bool>,
Expand All @@ -73,6 +77,7 @@ pub struct Builder {
endpoint_url: Option<String>,
retry_config: Option<RetryConfig>,
sleep_impl: Option<Arc<dyn AsyncSleep>>,
time_source: Option<SharedTimeSource>,
timeout_config: Option<TimeoutConfig>,
http_connector: Option<HttpConnector>,
use_fips: Option<bool>,
Expand Down Expand Up @@ -499,6 +504,18 @@ impl Builder {
self
}

#[doc = docs_for!(time_source)]
pub fn time_source(mut self, time_source: impl TimeSource + 'static) -> Self {
self.set_time_source(Some(SharedTimeSource::new(time_source)));
self
}

#[doc = docs_for!(time_source)]
pub fn set_time_source(&mut self, time_source: Option<SharedTimeSource>) -> &mut Self {
self.time_source = time_source;
self
}

/// Build a [`SdkConfig`](SdkConfig) from this builder
pub fn build(self) -> SdkConfig {
SdkConfig {
Expand All @@ -513,6 +530,7 @@ impl Builder {
http_connector: self.http_connector,
use_fips: self.use_fips,
use_dual_stack: self.use_dual_stack,
time_source: self.time_source,
}
}
}
Expand Down Expand Up @@ -554,6 +572,11 @@ impl SdkConfig {
self.credentials_provider.as_ref()
}

/// Configured time source
pub fn time_source(&self) -> Option<SharedTimeSource> {
self.time_source.clone()
}

/// Configured app name
pub fn app_name(&self) -> Option<&AppName> {
self.app_name.as_ref()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ class CustomizableOperationTestHelpers(runtimeConfig: RuntimeConfig) :
"BeforeTransmitInterceptorContextMut" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::interceptors::BeforeTransmitInterceptorContextMut"),
"ConfigBag" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("config_bag::ConfigBag"),
"ConfigBagAccessors" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::ConfigBagAccessors"),
"ConfigBagAccessors" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::orchestrator::ConfigBagAccessors"),
"http" to CargoDependency.Http.toType(),
"InterceptorContext" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::interceptors::InterceptorContext"),
"RequestTime" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::RequestTime"),
"StaticTimeSource" to CargoDependency.smithyAsync(runtimeConfig).withFeature("test-util").toType()
.resolve("test_util::StaticTimeSource"),
"SharedInterceptor" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::interceptors::SharedInterceptor"),
"TestParamsSetterInterceptor" to CargoDependency.smithyRuntime(runtimeConfig).withFeature("test-util")
Expand All @@ -48,7 +50,7 @@ class CustomizableOperationTestHelpers(runtimeConfig: RuntimeConfig) :
##[doc(hidden)]
// This is a temporary method for testing. NEVER use it in production
pub fn request_time_for_tests(mut self, request_time: ::std::time::SystemTime) -> Self {
self.operation.properties_mut().insert(request_time);
self.operation.properties_mut().insert(#{StaticTimeSource}::new(request_time));
self
}
Expand All @@ -70,7 +72,7 @@ class CustomizableOperationTestHelpers(runtimeConfig: RuntimeConfig) :
pub fn request_time_for_tests(mut self, request_time: ::std::time::SystemTime) -> Self {
use #{ConfigBagAccessors};
let interceptor = #{TestParamsSetterInterceptor}::new(move |_: &mut #{BeforeTransmitInterceptorContextMut}<'_>, cfg: &mut #{ConfigBag}| {
cfg.set_request_time(#{RequestTime}::new(request_time));
cfg.set_request_time(#{StaticTimeSource}::new(request_time));
});
self.interceptors.push(#{SharedInterceptor}::new(interceptor));
self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class GenericSmithySdkConfigSettings : ClientCodegenDecorator {
${section.serviceConfigBuilder}.set_sleep_impl(${section.sdkConfig}.sleep_impl());
${section.serviceConfigBuilder}.set_http_connector(${section.sdkConfig}.http_connector().cloned());
${section.serviceConfigBuilder}.set_time_source(${section.sdkConfig}.time_source().clone());
""",
)
},
Expand Down
2 changes: 1 addition & 1 deletion aws/sdk/integration-tests/s3/tests/checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn test_checksum_on_streaming_response(
);
let sdk_config = SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
.time_source(UNIX_EPOCH + Duration::from_secs(1624036048))
.region(Region::new("us-east-1"))
.http_connector(conn.clone())
.build();
Expand All @@ -73,7 +74,6 @@ async fn test_checksum_on_streaming_response(
.customize()
.await
.unwrap()
.request_time_for_tests(UNIX_EPOCH + Duration::from_secs(1624036048))
.user_agent_for_tests()
.send()
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ mod util;
use aws_http::user_agent::AwsUserAgent;
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::Client;
use aws_smithy_async::test_util::StaticTimeSource;
use aws_smithy_client::dvr;
use aws_smithy_client::dvr::MediaType;
use aws_smithy_client::erase::DynConnector;
use aws_smithy_runtime_api::client::interceptors::{
BeforeTransmitInterceptorContextMut, Interceptor,
};
use aws_smithy_runtime_api::client::orchestrator::ConfigBagAccessors;
use aws_smithy_runtime_api::client::orchestrator::RequestTime;
use aws_smithy_runtime_api::config_bag::ConfigBag;
use http::header::USER_AGENT;
use http::HeaderValue;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl Interceptor for RequestTimeResetInterceptor {
_context: &mut BeforeTransmitInterceptorContextMut<'_>,
cfg: &mut ConfigBag,
) -> Result<(), aws_smithy_runtime_api::client::interceptors::BoxError> {
cfg.set_request_time(RequestTime::new(UNIX_EPOCH));
cfg.set_request_time(StaticTimeSource::new(UNIX_EPOCH));

Ok(())
}
Expand All @@ -81,7 +81,7 @@ impl Interceptor for RequestTimeAdvanceInterceptor {
cfg: &mut ConfigBag,
) -> Result<(), aws_smithy_runtime_api::client::interceptors::BoxError> {
let request_time = cfg.request_time().unwrap();
let request_time = RequestTime::new(request_time.system_time() + self.0);
let request_time = StaticTimeSource::new(request_time.now() + self.0);
cfg.set_request_time(request_time);

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ use aws_smithy_client::dvr::MediaType;
use aws_smithy_client::erase::DynConnector;
use aws_smithy_runtime::client::retries::strategy::FixedDelayRetryStrategy;
use aws_smithy_runtime_api::client::interceptors::InterceptorRegistrar;
use aws_smithy_runtime_api::client::orchestrator::{ConfigBagAccessors, RequestTime};
use aws_smithy_runtime_api::client::orchestrator::ConfigBagAccessors;
use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin;
use aws_smithy_runtime_api::config_bag::ConfigBag;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, UNIX_EPOCH};

#[derive(Debug)]
struct FixupPlugin {
client: Client,
timestamp: SystemTime,
}

// # One SDK operation invocation.
Expand All @@ -44,7 +43,6 @@ async fn three_retries_and_then_success() {
.bucket("test-bucket");

cfg.put(params_builder);
cfg.set_request_time(RequestTime::new(self.timestamp.clone()));
cfg.put(AwsUserAgent::for_tests());
cfg.put(InvocationId::for_tests());
cfg.set_retry_strategy(FixedDelayRetryStrategy::one_second_delay());
Expand All @@ -58,11 +56,11 @@ async fn three_retries_and_then_success() {
.credentials_provider(Credentials::for_tests())
.region(Region::new("us-east-1"))
.http_connector(DynConnector::new(conn.clone()))
.time_source(UNIX_EPOCH + Duration::from_secs(1624036048))
.build();
let client = Client::from_conf(config);
let fixup = FixupPlugin {
client: client.clone(),
timestamp: UNIX_EPOCH + Duration::from_secs(1624036048),
};

let resp = dbg!(
Expand Down
8 changes: 3 additions & 5 deletions aws/sra-test/integration-tests/aws-sdk-s3/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

use aws_http::user_agent::AwsUserAgent;
use aws_runtime::invocation_id::InvocationId;
use aws_smithy_async::test_util::StaticTimeSource;
use aws_smithy_runtime_api::client::interceptors::{
BeforeTransmitInterceptorContextMut, Interceptor, InterceptorRegistrar,
};
use aws_smithy_runtime_api::client::orchestrator::{ConfigBagAccessors, RequestTime};
use aws_smithy_runtime_api::client::orchestrator::ConfigBagAccessors;
use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin;
use aws_smithy_runtime_api::config_bag::ConfigBag;
use http::header::USER_AGENT;
Expand All @@ -18,16 +19,13 @@ use std::time::SystemTime;
pub const X_AMZ_USER_AGENT: HeaderName = HeaderName::from_static("x-amz-user-agent");

#[derive(Debug)]
pub struct FixupPlugin {
pub timestamp: SystemTime,
}
pub struct FixupPlugin;
impl RuntimePlugin for FixupPlugin {
fn configure(
&self,
cfg: &mut ConfigBag,
_interceptors: &mut InterceptorRegistrar,
) -> Result<(), aws_smithy_runtime_api::client::runtime_plugin::BoxError> {
cfg.set_request_time(RequestTime::new(self.timestamp.clone()));
cfg.put(InvocationId::for_tests());
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class ResiliencyServiceRuntimePluginCustomization : ServiceRuntimePluginCustomiz
if let Some(timeout_config) = self.handle.conf.timeout_config() {
${section.configBagName}.put(timeout_config.clone());
}
${section.configBagName}.put(self.handle.conf.time_source.clone());
""",
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import software.amazon.smithy.rust.codegen.client.smithy.customizations.Resilien
import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyServiceRuntimePluginCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.config.ConfigCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.config.TimeSourceOperationCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.config.timeSourceCustomization
import software.amazon.smithy.rust.codegen.core.rustlang.Feature
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.smithy.customizations.AllowLintsCustomization
Expand Down Expand Up @@ -47,7 +49,8 @@ class RequiredCustomizations : ClientCodegenDecorator {
IdempotencyTokenGenerator(codegenContext, operation) +
EndpointPrefixGenerator(codegenContext, operation) +
HttpChecksumRequiredGenerator(codegenContext, operation) +
HttpVersionListCustomization(codegenContext, operation)
HttpVersionListCustomization(codegenContext, operation) +
TimeSourceOperationCustomization()

override fun configCustomizations(
codegenContext: ClientCodegenContext,
Expand All @@ -57,9 +60,9 @@ class RequiredCustomizations : ClientCodegenDecorator {
if (codegenContext.smithyRuntimeMode.generateOrchestrator) {
baseCustomizations + ResiliencyConfigCustomization(codegenContext) + InterceptorConfigCustomization(
codegenContext,
)
) + timeSourceCustomization(codegenContext)
} else {
baseCustomizations + ResiliencyConfigCustomization(codegenContext)
baseCustomizations + ResiliencyConfigCustomization(codegenContext) + timeSourceCustomization(codegenContext)
}

override fun libRsCustomizations(
Expand Down
Loading

0 comments on commit d0a8f56

Please sign in to comment.