Skip to content

Commit

Permalink
Use Async calls from AsyncHTTPClient (#555)
Browse files Browse the repository at this point in the history
* Use async HTTPClient calls

Currently streaming is not working

* Comments

* Extract HTTPBody out so it can be used in request

* Flush body, when not expecting anything

* Remove eventLoop from function comment headers
  • Loading branch information
adam-fowler committed Jul 5, 2023
1 parent 68479d9 commit b178f33
Show file tree
Hide file tree
Showing 13 changed files with 740 additions and 770 deletions.
5 changes: 0 additions & 5 deletions Sources/SotoCore/AWSClient+EndpointDiscovery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ extension AWSClient {
/// - serviceConfig: AWS service configuration used in request creation and signing
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - eventLoop: Optional EventLoop to run everything on
public func execute(
operation operationName: String,
path: String,
Expand Down Expand Up @@ -59,7 +58,6 @@ extension AWSClient {
/// - hostPrefix: Prefix to append to host name
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - eventLoop: Optional EventLoop to run everything on
public func execute<Input: AWSEncodableShape>(
operation operationName: String,
path: String,
Expand Down Expand Up @@ -96,7 +94,6 @@ extension AWSClient {
/// - serviceConfig: AWS service configuration used in request creation and signing
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - eventLoop: Optional EventLoop to run everything on
/// - returns:
/// Output object that completes when response is received
@discardableResult public func execute<Output: AWSDecodableShape>(
Expand Down Expand Up @@ -133,7 +130,6 @@ extension AWSClient {
/// - hostPrefix: Prefix to append to host name
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - eventLoop: Optional EventLoop to run everything on
/// - returns:
/// Output object that completes when response is received
public func execute<Output: AWSDecodableShape, Input: AWSEncodableShape>(
Expand Down Expand Up @@ -174,7 +170,6 @@ extension AWSClient {
/// - hostPrefix: Prefix to append to host name
/// - endpointDiscovery: Endpoint discovery helper
/// - logger: Logger
/// - eventLoop: Optional EventLoop to run everything on
/// - stream: Closure to stream payload response into
/// - returns:
/// Output object that completes when response is received
Expand Down
5 changes: 3 additions & 2 deletions Sources/SotoCore/AWSClient+Paginate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ extension AWSClient {
/// - Parameters:
/// - input: Initial Input value
/// - command: Command to be paginated
/// - tokenKey: KeyPath for Output token used to setup new Input
/// - inputKey: Optional KeyPath for Input token to compare against new key from Output
/// - outputKey: KeyPath for Output token used to read new Output
/// - moreResultsKey: Optional KeyPath for value indicating whether there are more results
/// - logger: Logger
/// - eventLoop: EventLoop to run everything on
public init(
input: Input,
command: @escaping ((Input, Logger) async throws -> Output),
Expand Down
94 changes: 49 additions & 45 deletions Sources/SotoCore/AWSClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,9 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { _ in
return
processResponse: { response in
// flush response body contents to complete response read
for try await _ in response.body {}
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -294,11 +292,9 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { _ in
return
processResponse: { response in
// flush response body contents to complete response read
for try await _ in response.body {}
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -331,11 +327,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -374,11 +367,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand Down Expand Up @@ -418,11 +408,8 @@ extension AWSClient {
configuration: serviceConfig
)
},
execute: { request, logger in
return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger, stream: stream)
},
processResponse: { response in
return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig)
},
config: serviceConfig,
logger: logger
Expand All @@ -433,8 +420,7 @@ extension AWSClient {
internal func execute<Output>(
operation operationName: String,
createRequest: @escaping () throws -> AWSRequest,
execute: @escaping (AWSHTTPRequest, Logger) async throws -> AWSHTTPResponse,
processResponse: @escaping (AWSHTTPResponse) throws -> Output,
processResponse: @escaping (AWSHTTPResponse) async throws -> Output,
config: AWSServiceConfig,
logger: Logger = AWSClient.loggingDisabled
) async throws -> Output {
Expand Down Expand Up @@ -467,9 +453,9 @@ extension AWSClient {
}
try Task.checkCancellation()
let response = try await self.invoke(
request: awsRequest,
with: config,
logger: logger,
request: { try await execute(awsRequest, logger) },
processResponse: processResponse,
streaming: streaming
)
Expand All @@ -494,21 +480,22 @@ extension AWSClient {
}

func invoke<Output>(
request: AWSHTTPRequest,
with serviceConfig: AWSServiceConfig,
logger: Logger,
request: @escaping () async throws -> AWSHTTPResponse,
processResponse: @escaping (AWSHTTPResponse) throws -> Output,
processResponse: @escaping (AWSHTTPResponse) async throws -> Output,
streaming: Bool
) async throws -> Output {
var attempt = 0
while true {
do {
let response = try await request()
let response = try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, logger: logger)
// if it returns an HTTP status code outside 2xx then throw an error
guard (200..<300).contains(response.status.code) else {
throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger)
let error = try await self.createError(for: response, serviceConfig: serviceConfig, logger: logger)
throw error
}
let output = try processResponse(response)
let output = try await processResponse(response)
return output
} catch {
// if streaming and the error returned is an AWS error fail immediately. Do not attempt
Expand All @@ -534,7 +521,6 @@ extension AWSClient {

/// Get credential used by client
/// - Parameters:
/// - eventLoop: optional eventLoop to run operation on
/// - logger: optional logger to use
/// - Returns: Credential
public func getCredential(logger: Logger = AWSClient.loggingDisabled) async throws -> Credential {
Expand Down Expand Up @@ -611,36 +597,54 @@ extension AWSClient {
// response validator
extension AWSClient {
/// Generate an AWS Response from the operation HTTP response and return the output shape from it. This is only every called if the response includes a successful http status code
internal func validate<Output: AWSDecodableShape>(operation operationName: String, response: AWSHTTPResponse, serviceConfig: AWSServiceConfig) throws -> Output {
internal func validate<Output: AWSDecodableShape>(
operation operationName: String,
response: AWSHTTPResponse,
serviceConfig: AWSServiceConfig
) async throws -> Output {
assert((200..<300).contains(response.status.code), "Shouldn't get here if error was returned")

let raw = Output._options.contains(.rawPayload) == true
let awsResponse = try AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw)
let awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw)
.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig)

return try awsResponse.generateOutputShape(operation: operationName)
}

/// Create error from HTTPResponse. This is only called if we received an unsuccessful http status code.
internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) -> Error {
internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) async throws -> Error {
// if we can create an AWSResponse and create an error from it return that
if let awsResponse = try? AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol)
.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig),
let error = awsResponse.generateError(serviceConfig: serviceConfig, logLevel: options.errorLogLevel, logger: logger)
{
return error
} else {
let awsResponse: AWSResponse
do {
awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol)
} catch {
// else return "Unhandled error message" with rawBody attached
var rawBodyString: String?
if var body = response.body {
rawBodyString = body.readString(length: body.readableBytes)
}
let context = AWSErrorContext(
message: "Unhandled Error",
responseCode: response.status,
headers: response.headers
)
return AWSRawError(rawBody: rawBodyString, context: context)
return AWSRawError(rawBody: nil, context: context)
}
do {
let awsResponseWithMiddleware = try awsResponse.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig)
if let error = awsResponseWithMiddleware.generateError(
serviceConfig: serviceConfig,
logLevel: options.errorLogLevel,
logger: logger
) {
return error
} else {
// else return "Unhandled error message" with rawBody attached
let context = AWSErrorContext(
message: "Unhandled Error",
responseCode: response.status,
headers: response.headers
)
return AWSRawError(rawBody: awsResponseWithMiddleware.body.asString(), context: context)
}
} catch {
return error
}
}
}
Expand Down
60 changes: 60 additions & 0 deletions Sources/SotoCore/Concurrency/AnyAsyncSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@usableFromInline
struct AnyAsyncSequence<Element>: Sendable, AsyncSequence {
@usableFromInline typealias AsyncIteratorNextCallback = () async throws -> Element?

@usableFromInline struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline let nextCallback: AsyncIteratorNextCallback

@inlinable init(nextCallback: @escaping AsyncIteratorNextCallback) {
self.nextCallback = nextCallback
}

@inlinable mutating func next() async throws -> Element? {
try await self.nextCallback()
}
}

@usableFromInline var makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback

@inlinable init<SequenceOfBytes>(
_ asyncSequence: SequenceOfBytes
) where SequenceOfBytes: AsyncSequence & Sendable, SequenceOfBytes.Element == Element {
self.makeAsyncIteratorCallback = {
var iterator = asyncSequence.makeAsyncIterator()
return {
try await iterator.next()
}
}
}

@inlinable func makeAsyncIterator() -> AsyncIterator {
.init(nextCallback: self.makeAsyncIteratorCallback())
}
}
55 changes: 55 additions & 0 deletions Sources/SotoCore/Concurrency/ByteBufferSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Soto for AWS open source project
//
// Copyright (c) 2017-2023 the Soto project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Soto project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import NIOPosix

/// Provide ByteBuffer as an AsyncSequence of equal size blocks
struct ByteBufferAsyncSequence: AsyncSequence, Sendable {
typealias Element = ByteBuffer

let byteBuffer: ByteBuffer
let chunkSize: Int

init(
_ byteBuffer: ByteBuffer,
chunkSize: Int
) {
self.byteBuffer = byteBuffer
self.chunkSize = chunkSize
}

struct AsyncIterator: AsyncIteratorProtocol {
var byteBuffer: ByteBuffer
let chunkSize: Int

mutating func next() async throws -> ByteBuffer? {
let size = Swift.min(self.chunkSize, self.byteBuffer.readableBytes)
if size > 0 {
return self.byteBuffer.readSlice(length: size)
}
return nil
}
}

func makeAsyncIterator() -> AsyncIterator {
.init(byteBuffer: self.byteBuffer, chunkSize: self.chunkSize)
}
}

extension ByteBuffer {
func asyncSequence(chunkSize: Int) -> ByteBufferAsyncSequence {
return ByteBufferAsyncSequence(self, chunkSize: chunkSize)
}
}
Loading

0 comments on commit b178f33

Please sign in to comment.