Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Async calls from AsyncHTTPClient #555

Merged
merged 5 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 43 additions & 40 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,6 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { _ in
return
},
Expand Down Expand Up @@ -294,9 +291,6 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { _ in
return
},
Expand Down Expand Up @@ -331,11 +325,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -374,11 +365,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -418,11 +406,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger, stream: stream)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand All @@ -433,8 +418,7 @@ extension AWSClient {
internal func execute<Output>(
operation operationName: String,
createRequest: @escaping () throws -> AWSRequest,
execute: @escaping (AWSHTTPRequest, Logger) async throws -> AWSHTTPResponse,
processResponse: @escaping (AWSHTTPResponse) throws -> Output,
processResponse: @escaping (AWSHTTPResponse) async throws -> Output,
config: AWSServiceConfig,
logger: Logger = AWSClient.loggingDisabled
) async throws -> Output {
Expand Down Expand Up @@ -467,9 +451,9 @@ extension AWSClient {
}
try Task.checkCancellation()
let response = try await self.invoke(
request: awsRequest,
with: config,
logger: logger,
request: { try await execute(awsRequest, logger) },
processResponse: processResponse,
streaming: streaming
)
Expand All @@ -494,21 +478,22 @@ extension AWSClient {
}

func invoke<Output>(
request: AWSHTTPRequest,
with serviceConfig: AWSServiceConfig,
logger: Logger,
request: @escaping () async throws -> AWSHTTPResponse,
processResponse: @escaping (AWSHTTPResponse) throws -> Output,
processResponse: @escaping (AWSHTTPResponse) async throws -> Output,
streaming: Bool
) async throws -> Output {
var attempt = 0
while true {
do {
let response = try await request()
let response = try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, logger: logger)
// if it returns an HTTP status code outside 2xx then throw an error
guard (200..<300).contains(response.status.code) else {
throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger)
let error = try await self.createError(for: response, serviceConfig: serviceConfig, logger: logger)
throw error
}
let output = try processResponse(response)
let output = try await processResponse(response)
return output
} catch {
// if streaming and the error returned is an AWS error fail immediately. Do not attempt
Expand Down Expand Up @@ -611,36 +596,54 @@ extension AWSClient {
// response validator
extension AWSClient {
/// Generate an AWS Response from the operation HTTP response and return the output shape from it. This is only every called if the response includes a successful http status code
internal func validate<Output: AWSDecodableShape>(operation operationName: String, response: AWSHTTPResponse, serviceConfig: AWSServiceConfig) throws -> Output {
internal func validate<Output: AWSDecodableShape>(
operation operationName: String,
response: AWSHTTPResponse,
serviceConfig: AWSServiceConfig
) async throws -> Output {
assert((200..<300).contains(response.status.code), "Shouldn't get here if error was returned")

let raw = Output._options.contains(.rawPayload) == true
let awsResponse = try AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw)
let awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw)
.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig)

return try awsResponse.generateOutputShape(operation: operationName)
}

/// Create error from HTTPResponse. This is only called if we received an unsuccessful http status code.
internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) -> Error {
internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) async throws -> Error {
// if we can create an AWSResponse and create an error from it return that
if let awsResponse = try? AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol)
.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig),
let error = awsResponse.generateError(serviceConfig: serviceConfig, logLevel: options.errorLogLevel, logger: logger)
{
return error
} else {
let awsResponse: AWSResponse
do {
awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol)
} catch {
// else return "Unhandled error message" with rawBody attached
var rawBodyString: String?
if var body = response.body {
rawBodyString = body.readString(length: body.readableBytes)
}
let context = AWSErrorContext(
message: "Unhandled Error",
responseCode: response.status,
headers: response.headers
)
return AWSRawError(rawBody: rawBodyString, context: context)
return AWSRawError(rawBody: nil, context: context)
}
do {
let awsResponseWithMiddleware = try awsResponse.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig)
if let error = awsResponseWithMiddleware.generateError(
serviceConfig: serviceConfig,
logLevel: options.errorLogLevel,
logger: logger
) {
return error
} else {
// else return "Unhandled error message" with rawBody attached
let context = AWSErrorContext(
message: "Unhandled Error",
responseCode: response.status,
headers: response.headers
)
return AWSRawError(rawBody: awsResponseWithMiddleware.body.asString(), context: context)
}
} catch {
return error
}
}
}
Expand Down
60 changes: 60 additions & 0 deletions Sources/SotoCore/Concurrency/AnyAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@usableFromInline
struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
@usableFromInline typealias AsyncIteratorNextCallback = () async throws -> Element?

@usableFromInline struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline let nextCallback: AsyncIteratorNextCallback

@inlinable init(nextCallback: @escaping AsyncIteratorNextCallback) {
self.nextCallback = nextCallback
}

@inlinable mutating func next() async throws -> Element? {
try await self.nextCallback()
}
}

@usableFromInline var makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback

@inlinable init<SequenceOfBytes>(
_ asyncSequence: SequenceOfBytes
) where SequenceOfBytes: AsyncSequence & Sendable, SequenceOfBytes.Element == Element {
self.makeAsyncIteratorCallback = {
var iterator = asyncSequence.makeAsyncIterator()
return {
try await iterator.next()
}
}
}

@inlinable func makeAsyncIterator() -> AsyncIterator {
.init(nextCallback: self.makeAsyncIteratorCallback())
}
}
55 changes: 55 additions & 0 deletions Sources/SotoCore/Concurrency/ByteBufferSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2017-2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import NIOPosix

/// Provide ByteBuffer as an AsyncSequence of equal size blocks
struct ByteBufferAsyncSequence: AsyncSequence, Sendable {
typealias Element = ByteBuffer

let byteBuffer: ByteBuffer
let chunkSize: Int

init(
_ byteBuffer: ByteBuffer,
chunkSize: Int
) {
self.byteBuffer = byteBuffer
self.chunkSize = chunkSize
}

struct AsyncIterator: AsyncIteratorProtocol {
var byteBuffer: ByteBuffer
let chunkSize: Int

mutating func next() async throws -> ByteBuffer? {
let size = Swift.min(self.chunkSize, self.byteBuffer.readableBytes)
if size > 0 {
return self.byteBuffer.readSlice(length: size)
}
return nil
}
}

func makeAsyncIterator() -> AsyncIterator {
.init(byteBuffer: self.byteBuffer, chunkSize: self.chunkSize)
}
}

extension ByteBuffer {
func asyncSequence(chunkSize: Int) -> ByteBufferAsyncSequence {
return ByteBufferAsyncSequence(self, chunkSize: chunkSize)
}
}
18 changes: 13 additions & 5 deletions Sources/SotoCore/Credential/MetaDataCredentialProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ extension MetaDataClient {
}
}

struct MetadataHTTPResponse {
public var status: HTTPResponseStatus
public var headers: HTTPHeaders
public var body: ByteBuffer?
}

struct ECSMetaDataClient: MetaDataClient {
typealias MetaData = ECSMetaData

Expand Down Expand Up @@ -111,17 +117,18 @@ struct ECSMetaDataClient: MetaDataClient {
}

func getMetaData(logger: Logger) async throws -> ECSMetaData {
let response = try await request(url: endpointURL, timeout: 2, logger: logger)
let response = try await request(url: endpointURL, timeout: .seconds(2), logger: logger)
guard let body = response.body else {
throw MetaDataClientError.missingMetaData
}
return try self.decoder.wrappedValue.decode(MetaData.self, from: body)
}

private func request(url: String, timeout: TimeInterval, logger: Logger) async throws -> AWSHTTPResponse {
private func request(url: String, timeout: TimeAmount, logger: Logger) async throws -> MetadataHTTPResponse {
try Task.checkCancellation()
let request = AWSHTTPRequest(url: URL(string: url)!, method: .GET, headers: [:], body: .empty)
return try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger)
let response = try await httpClient.execute(request: request, timeout: timeout, logger: logger)
return try await .init(status: response.status, headers: response.headers, body: response.body.collect(upTo: .max))
}
}

Expand Down Expand Up @@ -243,9 +250,10 @@ struct InstanceMetaDataClient: MetaDataClient {
method: HTTPMethod = .GET,
headers: HTTPHeaders = .init(),
logger: Logger
) async throws -> AWSHTTPResponse {
) async throws -> MetadataHTTPResponse {
try Task.checkCancellation()
let request = AWSHTTPRequest(url: url, method: method, headers: headers, body: .empty)
return try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger)
let response = try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger)
return try await .init(status: response.status, headers: response.headers, body: response.body.collect(upTo: .max))
}
}
Loading