Skip to content

Commit

Permalink
Remove EventLoops from CredentialProviders (#554)
Browse files Browse the repository at this point in the history
* Make credential providers async

* Fix tests

Also add support for initialising an ExpiringValue with an updating closure. Needed to add another state to get this working.

* getTaskProviderTask -> getCredentialProviderTask

* Format copyright 2023

* validation changes

* Fix issues with credential providers

Add error state for ExpiringValue where previous getValue returned an error

* Don't throw error during credential provider selection

If credential provider fails to provide a credential don't throw an error when you are shutting it down

* Shutdown test for rotating credential provider

And fixed a bug

* Added testDeferredCredentialProviderSetupShutdown
  • Loading branch information
adam-fowler committed Jul 5, 2023
1 parent d2000b1 commit 8a215cd
Show file tree
Hide file tree
Showing 33 changed files with 421 additions and 514 deletions.
4 changes: 2 additions & 2 deletions .swiftformat
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Minimum swiftformat version
--minversion 0.47.4
--minversion 0.51.0

# Swift version
--swiftversion 5.1
--swiftversion 5.5

# file options
--exclude .build
Expand Down
62 changes: 11 additions & 51 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,61 +115,26 @@ public final class AWSClient: Sendable {
///
/// - Throws: AWSClient.ClientError.alreadyShutdown: You have already shutdown the client
public func syncShutdown() throws {
let errorStorageLock = NIOLock()
var errorStorage: Error?
let errorStorage: NIOLockedValueBox<Error?> = .init(nil)
let continuation = DispatchWorkItem {}
self.shutdown(queue: DispatchQueue(label: "aws-client.shutdown")) { error in
if let error = error {
errorStorageLock.withLock {
Task {
do {
try await shutdown()
} catch {
errorStorage.withLockedValue { errorStorage in
errorStorage = error
}
}
continuation.perform()
}
continuation.wait()
try errorStorageLock.withLock {
try errorStorage.withLockedValue { errorStorage in
if let error = errorStorage {
throw error
}
}
}

/// Shutdown AWSClient asynchronously.
///
/// Before an `AWSClient` is deleted you need to call this function or the synchronous
/// version `syncShutdown` to do a clean shutdown of the client. It cleans up `CredentialProvider` tasks and shuts down
/// the HTTP client if it was created by the `AWSClient`. Given we could be destroying the `EventLoopGroup` the client
/// uses, we have to use a `DispatchQueue` to run some of this work on.
///
/// - Parameters:
/// - queue: Dispatch Queue to run shutdown on
/// - callback: Callback called when shutdown is complete. If there was an error it will return with Error in callback
public func shutdown(queue: DispatchQueue = .global(), _ callback: @escaping (Error?) -> Void) {
guard self.isShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged else {
callback(ClientError.alreadyShutdown)
return
}
let eventLoop = eventLoopGroup.next()
// ignore errors from credential provider. Don't need shutdown erroring because no providers were available
credentialProvider.shutdown(on: eventLoop).whenComplete { _ in
// if httpClient was created by AWSClient then it is required to shutdown the httpClient.
switch self.httpClientProvider {
case .createNew, .createNewWithEventLoopGroup:
self.httpClient.shutdown(queue: queue) { error in
if let error = error {
self.clientLogger.log(level: self.options.errorLogLevel, "Error shutting down HTTP client", metadata: [
"aws-error": "\(error)",
])
}
callback(error)
}

case .shared:
callback(nil)
}
}
}

// MARK: Member structs/enums

/// Errors returned by `AWSClient` code
Expand Down Expand Up @@ -246,7 +211,7 @@ extension AWSClient {
}
// shutdown credential provider ignoring any errors as credential provider that doesn't initialize
// can cause the shutdown process to fail
try? await self.credentialProvider.shutdown(on: self.eventLoopGroup.any()).get()
try? await self.credentialProvider.shutdown()
// if httpClient was created by AWSClient then it is required to shutdown the httpClient.
switch self.httpClientProvider {
case .createNew, .createNewWithEventLoopGroup:
Expand Down Expand Up @@ -485,7 +450,7 @@ extension AWSClient {
logger.log(level: self.options.requestLogLevel, "AWS Request")
do {
// get credentials
let credential = try await credentialProvider.getCredential(on: self.eventLoopGroup.any(), logger: logger).get()
let credential = try await credentialProvider.getCredential(logger: logger)
// construct signer
let signer = AWSSigner(credentials: credential, name: config.signingName, region: config.region.rawValue)
// create request and sign with signer
Expand Down Expand Up @@ -573,12 +538,7 @@ extension AWSClient {
/// - logger: optional logger to use
/// - Returns: Credential
public func getCredential(logger: Logger = AWSClient.loggingDisabled) async throws -> Credential {
let eventLoop = self.eventLoopGroup.any()
if let asyncCredentialProvider = self.credentialProvider as? AsyncCredentialProvider {
return try await asyncCredentialProvider.getCredential(on: eventLoop, logger: logger)
} else {
return try await self.credentialProvider.getCredential(on: eventLoop, logger: logger).get()
}
try await self.credentialProvider.getCredential(logger: logger)
}

/// Generate a signed URL
Expand Down Expand Up @@ -643,7 +603,7 @@ extension AWSClient {
}

func createSigner(serviceConfig: AWSServiceConfig, logger: Logger) async throws -> AWSSigner {
let credential = try await credentialProvider.getCredential(on: eventLoopGroup.next(), logger: logger).get()
let credential = try await credentialProvider.getCredential(logger: logger)
return AWSSigner(credentials: credential, name: serviceConfig.signingName, region: serviceConfig.region.rawValue)
}
}
Expand Down
73 changes: 63 additions & 10 deletions Sources/SotoCore/Concurrency/ExpiringValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ actor ExpiringValue<T> {
enum State {
/// No value is stored
case noValue
/// Initial call waiting on a value to be generated. Cannot use `waitingOnValue`` in
/// initial call as it means we would have to setup it up before all stored properties
/// have been initialized
case initialWaitingOnValue(Task<(T, Date), Error>)
/// Waiting on a value to be generated
case waitingOnValue(Task<T, Error>)
/// Is holding a value
case withValue(T, Date)
/// Is holding a value, and there is a task in progress to update it
case withValueAndWaiting(T, Date, Task<T, Error>)
/// Error
case error(Error)
}

var state: State
Expand All @@ -46,51 +52,98 @@ actor ExpiringValue<T> {
self.state = .withValue(initialValue, expires)
}

init(threshold: TimeInterval = 2, getExpiringValue: @escaping @Sendable () async throws -> (T, Date)) {
self.threshold = threshold
let task = Task {
try await getExpiringValue()
}
self.state = .initialWaitingOnValue(task)
}

func getValue(getExpiringValue: @escaping @Sendable () async throws -> (T, Date)) async throws -> T {
let task: Task<T, Error>
switch self.state {
case .noValue:
let task = self.getValueTask(getExpiringValue)
task = try self.getValueTask(getExpiringValue)
self.state = .waitingOnValue(task)
return try await task.value

case .waitingOnValue(let task):
return try await task.value
case .initialWaitingOnValue(let task):
return try await withTaskCancellationHandler {
switch await task.result {
case .success(let result):
self.state = .withValue(result.0, result.1)
return result.0
case .failure(let error):
self.state = .error(error)
throw error
}
} onCancel: {
task.cancel()
}

case .waitingOnValue(let waitingOnTask):
task = waitingOnTask

case .withValue(let value, let expires):
if expires.timeIntervalSinceNow < 0 {
// value has expired, create new task to update value and
// return the result of that task
let task = self.getValueTask(getExpiringValue)
task = try self.getValueTask(getExpiringValue)
self.state = .waitingOnValue(task)
return try await task.value
} else if expires.timeIntervalSinceNow < self.threshold {
// value is about to expire, create new task to update value and
// return current value
let task = self.getValueTask(getExpiringValue)
let task = try self.getValueTask(getExpiringValue)
self.state = .withValueAndWaiting(value, expires, task)
return value
} else {
return value
}

case .withValueAndWaiting(let value, let expires, let task):
case .withValueAndWaiting(let value, let expires, let waitingOnTask):
if expires.timeIntervalSinceNow < 0 {
// as value has expired wait for task to finish and return result
return try await task.value
task = waitingOnTask
} else {
// value hasn't expired so return current value
return value
}

case .error(let error):
throw error
}
return try await withTaskCancellationHandler {
switch await task.result {
case .success(let value):
return value
case .failure(let error):
self.state = .error(error)
throw error
}
} onCancel: {
task.cancel()
}
}

/// Create task that will return a new version of the value and a date it will expire
/// - Parameter getExpiringValue: Function return value and expiration date
func getValueTask(_ getExpiringValue: @escaping @Sendable () async throws -> (T, Date)) -> Task<T, Error> {
func getValueTask(_ getExpiringValue: @escaping @Sendable () async throws -> (T, Date)) throws -> Task<T, Error> {
try Task.checkCancellation()
return Task {
let (value, expires) = try await getExpiringValue()
self.state = .withValue(value, expires)
return value
}
}

func cancel() {
switch self.state {
case .initialWaitingOnValue(let task):
task.cancel()
case .waitingOnValue(let task), .withValueAndWaiting(_, _, let task):
task.cancel()
default:
break
}
}
}
46 changes: 22 additions & 24 deletions Sources/SotoCore/Credential/ConfigFileCredentialProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2017-2022 the Soto project authors
// Copyright (c) 2017-2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand All @@ -18,28 +18,32 @@ import NIOCore
import SotoSignerV4

final class ConfigFileCredentialProvider: CredentialProviderSelector {
/// promise to find a credential provider
let startupPromise: EventLoopPromise<CredentialProvider>
/// lock for access to _internalProvider.
let lock = NIOLock()
/// internal version of internal provider. Should access this through `internalProvider`
var _internalProvider: CredentialProvider?

private let getProviderTask: Task<CredentialProvider, Error>
init(
credentialsFilePath: String,
configFilePath: String,
profile: String? = nil,
context: CredentialProviderFactory.Context,
endpoint: String? = nil
) {
self.startupPromise = context.eventLoop.makePromise(of: CredentialProvider.self)
self.startupPromise.futureResult.whenSuccess { result in
self.internalProvider = result
self.getProviderTask = Task {
let profile = profile ?? Environment["AWS_PROFILE"] ?? ConfigFileLoader.defaultProfile
return try await ConfigFileCredentialProvider.credentialProvider(
from: credentialsFilePath,
configFilePath: configFilePath,
for: profile,
context: context,
endpoint: endpoint
)
}
}

let profile = profile ?? Environment["AWS_PROFILE"] ?? ConfigFileLoader.defaultProfile
Self.credentialProvider(from: credentialsFilePath, configFilePath: configFilePath, for: profile, context: context, endpoint: endpoint)
.cascade(to: self.startupPromise)
func getCredentialProviderTask() async throws -> CredentialProvider {
try await self.getProviderTask.value
}

func cancelCredentialProviderTask() {
self.getProviderTask.cancel()
}

/// Credential provider from shared credentials and profile configuration files
Expand All @@ -57,16 +61,14 @@ final class ConfigFileCredentialProvider: CredentialProviderSelector {
for profile: String,
context: CredentialProviderFactory.Context,
endpoint: String?
) -> EventLoopFuture<CredentialProvider> {
return ConfigFileLoader.loadSharedCredentials(
) async throws -> CredentialProvider {
let sharedCredentials = try await ConfigFileLoader.loadSharedCredentials(
credentialsFilePath: credentialsFilePath,
configFilePath: configFilePath,
profile: profile,
context: context
)
.flatMapThrowing { sharedCredentials in
return try self.credentialProvider(from: sharedCredentials, context: context, endpoint: endpoint)
}
).get()
return try self.credentialProvider(from: sharedCredentials, context: context, endpoint: endpoint)
}

/// Generate credential provider based on shared credentials and profile configuration
Expand Down Expand Up @@ -100,7 +102,3 @@ final class ConfigFileCredentialProvider: CredentialProviderSelector {
}
}
}

// can use @unchecked Sendable here as `_internalProvider`` is accessed via `internalProvider` which
// protects access with a `NIOLock`
extension ConfigFileCredentialProvider: @unchecked Sendable {}
2 changes: 1 addition & 1 deletion Sources/SotoCore/Credential/Credential+IsEmpty.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2017-2020 the Soto project authors
// Copyright (c) 2017-2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down
34 changes: 0 additions & 34 deletions Sources/SotoCore/Credential/CredentialProvider+async.swift

This file was deleted.

Loading

0 comments on commit 8a215cd

Please sign in to comment.