From 5d37291ea7b9698e31d264c9dc8213e588bc7001 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 22 Jun 2023 08:45:58 +0100 Subject: [PATCH] Use Async calls from AsyncHTTPClient (#555) * Use async HTTPClient calls Currently streaming is not working * Comments * Extract HTTPBody out so it can be used in request * Flush body, when not expecting anything * Remove eventLoop from function comment headers --- .../AWSClient+EndpointDiscovery.swift | 5 - Sources/SotoCore/AWSClient+Paginate.swift | 5 +- Sources/SotoCore/AWSClient.swift | 94 +-- .../Concurrency/AnyAsyncSequence.swift | 60 ++ .../Concurrency/ByteBufferSequence.swift | 55 ++ .../MetaDataCredentialProvider.swift | 18 +- Sources/SotoCore/HTTP/AWSHTTPTypes.swift | 87 ++- Sources/SotoCore/HTTP/AsyncHTTPClient.swift | 106 +-- Sources/SotoCore/HTTP/ResponseDelegate.swift | 95 --- Sources/SotoCore/Message/AWSResponse.swift | 23 +- Tests/SotoCoreTests/AWSClientTests.swift | 616 +++++++++--------- Tests/SotoCoreTests/AWSResponseTests.swift | 257 +++----- Tests/SotoCoreTests/TimeStampTests.swift | 89 +-- 13 files changed, 740 insertions(+), 770 deletions(-) create mode 100644 Sources/SotoCore/Concurrency/AnyAsyncSequence.swift create mode 100644 Sources/SotoCore/Concurrency/ByteBufferSequence.swift delete mode 100644 Sources/SotoCore/HTTP/ResponseDelegate.swift diff --git a/Sources/SotoCore/AWSClient+EndpointDiscovery.swift b/Sources/SotoCore/AWSClient+EndpointDiscovery.swift index 45ccb45ba..527c2edf5 100644 --- a/Sources/SotoCore/AWSClient+EndpointDiscovery.swift +++ b/Sources/SotoCore/AWSClient+EndpointDiscovery.swift @@ -24,7 +24,6 @@ extension AWSClient { /// - serviceConfig: AWS service configuration used in request creation and signing /// - endpointDiscovery: Endpoint discovery helper /// - logger: Logger - /// - eventLoop: Optional EventLoop to run everything on public func execute( operation operationName: String, path: String, @@ -59,7 +58,6 @@ extension AWSClient { /// - hostPrefix: Prefix to append to host name /// - endpointDiscovery: Endpoint discovery helper /// - logger: Logger - /// - eventLoop: Optional EventLoop to run everything on public func execute( operation operationName: String, path: String, @@ -96,7 +94,6 @@ extension AWSClient { /// - serviceConfig: AWS service configuration used in request creation and signing /// - endpointDiscovery: Endpoint discovery helper /// - logger: Logger - /// - eventLoop: Optional EventLoop to run everything on /// - returns: /// Output object that completes when response is received @discardableResult public func execute( @@ -133,7 +130,6 @@ extension AWSClient { /// - hostPrefix: Prefix to append to host name /// - endpointDiscovery: Endpoint discovery helper /// - logger: Logger - /// - eventLoop: Optional EventLoop to run everything on /// - returns: /// Output object that completes when response is received public func execute( @@ -174,7 +170,6 @@ extension AWSClient { /// - hostPrefix: Prefix to append to host name /// - endpointDiscovery: Endpoint discovery helper /// - logger: Logger - /// - eventLoop: Optional EventLoop to run everything on /// - stream: Closure to stream payload response into /// - returns: /// Output object that completes when response is received diff --git a/Sources/SotoCore/AWSClient+Paginate.swift b/Sources/SotoCore/AWSClient+Paginate.swift index 5c9006ddb..f49b60901 100644 --- a/Sources/SotoCore/AWSClient+Paginate.swift +++ b/Sources/SotoCore/AWSClient+Paginate.swift @@ -37,9 +37,10 @@ extension AWSClient { /// - Parameters: /// - input: Initial Input value /// - command: Command to be paginated - /// - tokenKey: KeyPath for Output token used to setup new Input + /// - inputKey: Optional KeyPath for Input token to compare against new key from Output + /// - outputKey: KeyPath for Output token used to read new Output + /// - moreResultsKey: Optional KeyPath for value indicating whether there are more results /// - logger: Logger - /// - eventLoop: EventLoop to run everything on public init( input: Input, command: @escaping ((Input, Logger) async throws -> Output), diff --git a/Sources/SotoCore/AWSClient.swift b/Sources/SotoCore/AWSClient.swift index afe95082e..24098ec74 100644 --- a/Sources/SotoCore/AWSClient.swift +++ b/Sources/SotoCore/AWSClient.swift @@ -259,11 +259,9 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) - }, - processResponse: { _ in - return + processResponse: { response in + // flush response body contents to complete response read + for try await _ in response.body {} }, config: serviceConfig, logger: logger @@ -294,11 +292,9 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) - }, - processResponse: { _ in - return + processResponse: { response in + // flush response body contents to complete response read + for try await _ in response.body {} }, config: serviceConfig, logger: logger @@ -331,11 +327,8 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) - }, processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) + return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, logger: logger @@ -374,11 +367,8 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger) - }, processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) + return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, logger: logger @@ -418,11 +408,8 @@ extension AWSClient { configuration: serviceConfig ) }, - execute: { request, logger in - return try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, on: self.eventLoopGroup.any(), logger: logger, stream: stream) - }, processResponse: { response in - return try self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) + return try await self.validate(operation: operationName, response: response, serviceConfig: serviceConfig) }, config: serviceConfig, logger: logger @@ -433,8 +420,7 @@ extension AWSClient { internal func execute( operation operationName: String, createRequest: @escaping () throws -> AWSRequest, - execute: @escaping (AWSHTTPRequest, Logger) async throws -> AWSHTTPResponse, - processResponse: @escaping (AWSHTTPResponse) throws -> Output, + processResponse: @escaping (AWSHTTPResponse) async throws -> Output, config: AWSServiceConfig, logger: Logger = AWSClient.loggingDisabled ) async throws -> Output { @@ -467,9 +453,9 @@ extension AWSClient { } try Task.checkCancellation() let response = try await self.invoke( + request: awsRequest, with: config, logger: logger, - request: { try await execute(awsRequest, logger) }, processResponse: processResponse, streaming: streaming ) @@ -494,21 +480,22 @@ extension AWSClient { } func invoke( + request: AWSHTTPRequest, with serviceConfig: AWSServiceConfig, logger: Logger, - request: @escaping () async throws -> AWSHTTPResponse, - processResponse: @escaping (AWSHTTPResponse) throws -> Output, + processResponse: @escaping (AWSHTTPResponse) async throws -> Output, streaming: Bool ) async throws -> Output { var attempt = 0 while true { do { - let response = try await request() + let response = try await self.httpClient.execute(request: request, timeout: serviceConfig.timeout, logger: logger) // if it returns an HTTP status code outside 2xx then throw an error guard (200..<300).contains(response.status.code) else { - throw self.createError(for: response, serviceConfig: serviceConfig, logger: logger) + let error = try await self.createError(for: response, serviceConfig: serviceConfig, logger: logger) + throw error } - let output = try processResponse(response) + let output = try await processResponse(response) return output } catch { // if streaming and the error returned is an AWS error fail immediately. Do not attempt @@ -534,7 +521,6 @@ extension AWSClient { /// Get credential used by client /// - Parameters: - /// - eventLoop: optional eventLoop to run operation on /// - logger: optional logger to use /// - Returns: Credential public func getCredential(logger: Logger = AWSClient.loggingDisabled) async throws -> Credential { @@ -611,36 +597,54 @@ extension AWSClient { // response validator extension AWSClient { /// Generate an AWS Response from the operation HTTP response and return the output shape from it. This is only every called if the response includes a successful http status code - internal func validate(operation operationName: String, response: AWSHTTPResponse, serviceConfig: AWSServiceConfig) throws -> Output { + internal func validate( + operation operationName: String, + response: AWSHTTPResponse, + serviceConfig: AWSServiceConfig + ) async throws -> Output { assert((200..<300).contains(response.status.code), "Shouldn't get here if error was returned") let raw = Output._options.contains(.rawPayload) == true - let awsResponse = try AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol, raw: raw) .applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig) return try awsResponse.generateOutputShape(operation: operationName) } /// Create error from HTTPResponse. This is only called if we received an unsuccessful http status code. - internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) -> Error { + internal func createError(for response: AWSHTTPResponse, serviceConfig: AWSServiceConfig, logger: Logger) async throws -> Error { // if we can create an AWSResponse and create an error from it return that - if let awsResponse = try? AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol) - .applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig), - let error = awsResponse.generateError(serviceConfig: serviceConfig, logLevel: options.errorLogLevel, logger: logger) - { - return error - } else { + let awsResponse: AWSResponse + do { + awsResponse = try await AWSResponse(from: response, serviceProtocol: serviceConfig.serviceProtocol) + } catch { // else return "Unhandled error message" with rawBody attached - var rawBodyString: String? - if var body = response.body { - rawBodyString = body.readString(length: body.readableBytes) - } let context = AWSErrorContext( message: "Unhandled Error", responseCode: response.status, headers: response.headers ) - return AWSRawError(rawBody: rawBodyString, context: context) + return AWSRawError(rawBody: nil, context: context) + } + do { + let awsResponseWithMiddleware = try awsResponse.applyMiddlewares(serviceConfig.middlewares + middlewares, config: serviceConfig) + if let error = awsResponseWithMiddleware.generateError( + serviceConfig: serviceConfig, + logLevel: options.errorLogLevel, + logger: logger + ) { + return error + } else { + // else return "Unhandled error message" with rawBody attached + let context = AWSErrorContext( + message: "Unhandled Error", + responseCode: response.status, + headers: response.headers + ) + return AWSRawError(rawBody: awsResponseWithMiddleware.body.asString(), context: context) + } + } catch { + return error } } } diff --git a/Sources/SotoCore/Concurrency/AnyAsyncSequence.swift b/Sources/SotoCore/Concurrency/AnyAsyncSequence.swift new file mode 100644 index 000000000..a6547eb86 --- /dev/null +++ b/Sources/SotoCore/Concurrency/AnyAsyncSequence.swift @@ -0,0 +1,60 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Soto for AWS open source project +// +// Copyright (c) 2023 the Soto project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Soto project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2022 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@usableFromInline +struct AnyAsyncSequence: Sendable, AsyncSequence { + @usableFromInline typealias AsyncIteratorNextCallback = () async throws -> Element? + + @usableFromInline struct AsyncIterator: AsyncIteratorProtocol { + @usableFromInline let nextCallback: AsyncIteratorNextCallback + + @inlinable init(nextCallback: @escaping AsyncIteratorNextCallback) { + self.nextCallback = nextCallback + } + + @inlinable mutating func next() async throws -> Element? { + try await self.nextCallback() + } + } + + @usableFromInline var makeAsyncIteratorCallback: @Sendable () -> AsyncIteratorNextCallback + + @inlinable init( + _ asyncSequence: SequenceOfBytes + ) where SequenceOfBytes: AsyncSequence & Sendable, SequenceOfBytes.Element == Element { + self.makeAsyncIteratorCallback = { + var iterator = asyncSequence.makeAsyncIterator() + return { + try await iterator.next() + } + } + } + + @inlinable func makeAsyncIterator() -> AsyncIterator { + .init(nextCallback: self.makeAsyncIteratorCallback()) + } +} diff --git a/Sources/SotoCore/Concurrency/ByteBufferSequence.swift b/Sources/SotoCore/Concurrency/ByteBufferSequence.swift new file mode 100644 index 000000000..98d6b9784 --- /dev/null +++ b/Sources/SotoCore/Concurrency/ByteBufferSequence.swift @@ -0,0 +1,55 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Soto for AWS open source project +// +// Copyright (c) 2017-2023 the Soto project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Soto project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOPosix + +/// Provide ByteBuffer as an AsyncSequence of equal size blocks +struct ByteBufferAsyncSequence: AsyncSequence, Sendable { + typealias Element = ByteBuffer + + let byteBuffer: ByteBuffer + let chunkSize: Int + + init( + _ byteBuffer: ByteBuffer, + chunkSize: Int + ) { + self.byteBuffer = byteBuffer + self.chunkSize = chunkSize + } + + struct AsyncIterator: AsyncIteratorProtocol { + var byteBuffer: ByteBuffer + let chunkSize: Int + + mutating func next() async throws -> ByteBuffer? { + let size = Swift.min(self.chunkSize, self.byteBuffer.readableBytes) + if size > 0 { + return self.byteBuffer.readSlice(length: size) + } + return nil + } + } + + func makeAsyncIterator() -> AsyncIterator { + .init(byteBuffer: self.byteBuffer, chunkSize: self.chunkSize) + } +} + +extension ByteBuffer { + func asyncSequence(chunkSize: Int) -> ByteBufferAsyncSequence { + return ByteBufferAsyncSequence(self, chunkSize: chunkSize) + } +} diff --git a/Sources/SotoCore/Credential/MetaDataCredentialProvider.swift b/Sources/SotoCore/Credential/MetaDataCredentialProvider.swift index a0c5b64b2..1f002ec28 100644 --- a/Sources/SotoCore/Credential/MetaDataCredentialProvider.swift +++ b/Sources/SotoCore/Credential/MetaDataCredentialProvider.swift @@ -67,6 +67,12 @@ extension MetaDataClient { } } +struct MetadataHTTPResponse { + public var status: HTTPResponseStatus + public var headers: HTTPHeaders + public var body: ByteBuffer? +} + struct ECSMetaDataClient: MetaDataClient { typealias MetaData = ECSMetaData @@ -111,17 +117,18 @@ struct ECSMetaDataClient: MetaDataClient { } func getMetaData(logger: Logger) async throws -> ECSMetaData { - let response = try await request(url: endpointURL, timeout: 2, logger: logger) + let response = try await request(url: endpointURL, timeout: .seconds(2), logger: logger) guard let body = response.body else { throw MetaDataClientError.missingMetaData } return try self.decoder.wrappedValue.decode(MetaData.self, from: body) } - private func request(url: String, timeout: TimeInterval, logger: Logger) async throws -> AWSHTTPResponse { + private func request(url: String, timeout: TimeAmount, logger: Logger) async throws -> MetadataHTTPResponse { try Task.checkCancellation() let request = AWSHTTPRequest(url: URL(string: url)!, method: .GET, headers: [:], body: .empty) - return try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger) + let response = try await httpClient.execute(request: request, timeout: timeout, logger: logger) + return try await .init(status: response.status, headers: response.headers, body: response.body.collect(upTo: .max)) } } @@ -243,9 +250,10 @@ struct InstanceMetaDataClient: MetaDataClient { method: HTTPMethod = .GET, headers: HTTPHeaders = .init(), logger: Logger - ) async throws -> AWSHTTPResponse { + ) async throws -> MetadataHTTPResponse { try Task.checkCancellation() let request = AWSHTTPRequest(url: url, method: method, headers: headers, body: .empty) - return try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger) + let response = try await httpClient.execute(request: request, timeout: TimeAmount.seconds(2), logger: logger) + return try await .init(status: response.status, headers: response.headers, body: response.body.collect(upTo: .max)) } } diff --git a/Sources/SotoCore/HTTP/AWSHTTPTypes.swift b/Sources/SotoCore/HTTP/AWSHTTPTypes.swift index f49842d7e..d1787972a 100644 --- a/Sources/SotoCore/HTTP/AWSHTTPTypes.swift +++ b/Sources/SotoCore/HTTP/AWSHTTPTypes.swift @@ -20,6 +20,68 @@ import NIOHTTP1 /// Function that streamed response chunks are sent ot public typealias AWSResponseStream = (ByteBuffer, EventLoop) -> EventLoopFuture +/// Storage for HTTP body which can be either a ByteBuffer or an AsyncSequence of +/// ByteBuffers +struct HTTPBody { + enum Storage { + case byteBuffer(ByteBuffer) + case asyncSequence(sequence: AnyAsyncSequence, length: Int?) + } + + let storage: Storage + + init() { + self.storage = .byteBuffer(ByteBuffer()) + } + + init(_ byteBuffer: ByteBuffer) { + self.storage = .byteBuffer(byteBuffer) + } + + init(_ sequence: BufferSequence, length: Int?) where BufferSequence.Element == ByteBuffer { + self.storage = .asyncSequence(sequence: .init(sequence), length: length) + } + + func collect(upTo length: Int) async throws -> ByteBuffer { + switch self.storage { + case .byteBuffer(let buffer): + return buffer + case .asyncSequence(let sequence, _): + return try await sequence.collect(upTo: length) + } + } + + var length: Int? { + switch self.storage { + case .byteBuffer(let buffer): + return buffer.readableBytes + case .asyncSequence(_, let length): + return length + } + } + + var isStreaming: Bool { + if case .asyncSequence = self.storage { + return true + } + return false + } +} + +extension HTTPBody: AsyncSequence { + typealias Element = ByteBuffer + typealias AsyncIterator = AnyAsyncSequence.AsyncIterator + + func makeAsyncIterator() -> AsyncIterator { + switch self.storage { + case .byteBuffer(let buffer): + return AnyAsyncSequence(buffer.asyncSequence(chunkSize: buffer.readableBytes)).makeAsyncIterator() + case .asyncSequence(let sequence, _): + return sequence.makeAsyncIterator() + } + } +} + /// HTTP Request struct AWSHTTPRequest { let url: URL @@ -35,12 +97,21 @@ struct AWSHTTPRequest { } } -/// HTTP Response -protocol AWSHTTPResponse { - /// HTTP response status - var status: HTTPResponseStatus { get } - /// HTTP response headers - var headers: HTTPHeaders { get } - /// Payload of response - var body: ByteBuffer? { get } +/// Generic HTTP Response returned from HTTP Client +struct AWSHTTPResponse: Sendable { + /// Initialize AWSHTTPResponse + init(status: HTTPResponseStatus, headers: HTTPHeaders, body: HTTPBody = .init()) { + self.status = status + self.headers = headers + self.body = body + } + + /// The HTTP status for this response. + var status: HTTPResponseStatus + + /// The HTTP headers of this response. + var headers: HTTPHeaders + + /// The body of this HTTP response. + var body: HTTPBody } diff --git a/Sources/SotoCore/HTTP/AsyncHTTPClient.swift b/Sources/SotoCore/HTTP/AsyncHTTPClient.swift index ff70eab4a..b4028422c 100644 --- a/Sources/SotoCore/HTTP/AsyncHTTPClient.swift +++ b/Sources/SotoCore/HTTP/AsyncHTTPClient.swift @@ -27,103 +27,41 @@ extension AsyncHTTPClient.HTTPClient { func execute( request: AWSHTTPRequest, timeout: TimeAmount, - on eventLoop: EventLoop, logger: Logger - ) -> EventLoopFuture { - let requestBody: AsyncHTTPClient.HTTPClient.Body? - var requestHeaders = request.headers + ) async throws -> AWSHTTPResponse { + let requestBody: HTTPClientRequest.Body? switch request.body.payload { case .byteBuffer(let byteBuffer): - requestBody = .byteBuffer(byteBuffer) - case .stream(let reader): - requestHeaders = reader.updateHeaders(headers: requestHeaders) - requestBody = .stream(length: reader.contentSize) { writer in - return writer.write(reader: reader, on: eventLoop) - } - case .empty: + requestBody = .bytes(byteBuffer) + /* case .asyncSequence(let sequence, let length): + requestBody = .stream( + sequence, + length: length.map { .known($0) } ?? .unknown + ) */ + default: requestBody = nil } - do { - let asyncRequest = try AsyncHTTPClient.HTTPClient.Request( - url: request.url, - method: request.method, - headers: requestHeaders, - body: requestBody - ) - return self.execute( - request: asyncRequest, - eventLoop: .delegate(on: eventLoop), - deadline: .now() + timeout, - logger: logger - ).map { $0 } - } catch { - return eventLoopGroup.next().makeFailedFuture(error) - } - } + var httpRequest = HTTPClientRequest(url: request.url.absoluteString) + httpRequest.method = request.method + httpRequest.headers = request.headers + httpRequest.body = requestBody - func execute( - request: AWSHTTPRequest, - timeout: TimeAmount, - on eventLoop: EventLoop, - logger: Logger, - stream: @escaping AWSResponseStream - ) -> EventLoopFuture { - let requestBody: AsyncHTTPClient.HTTPClient.Body? - if case .byteBuffer(let body) = request.body.payload { - requestBody = .byteBuffer(body) - } else { - requestBody = nil - } - do { - let asyncRequest = try AsyncHTTPClient.HTTPClient.Request( - url: request.url, - method: request.method, - headers: request.headers, - body: requestBody - ) - let delegate = AWSHTTPClientResponseDelegate(host: asyncRequest.host, stream: stream) - return self.execute( - request: asyncRequest, - delegate: delegate, - eventLoop: .delegate(on: eventLoop), - deadline: .now() + timeout, - logger: logger - ).futureResult - } catch { - return eventLoopGroup.next().makeFailedFuture(error) - } + let response = try await self.execute(httpRequest, timeout: timeout, logger: logger) + return .init( + status: response.status, + headers: response.headers, + body: .init(response.body, length: nil) + ) } -} -extension AsyncHTTPClient.HTTPClient.Response: AWSHTTPResponse {} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) -extension AsyncHTTPClient.HTTPClient { - /// Execute HTTP request - /// - Parameters: - /// - request: HTTP request - /// - timeout: If execution is idle for longer than timeout then throw error - /// - eventLoop: eventLoop to run request on - /// - Returns: EventLoopFuture that will be fulfilled with request response func execute( request: AWSHTTPRequest, timeout: TimeAmount, - on eventLoop: EventLoop? = nil, - logger: Logger - ) async throws -> AWSHTTPResponse { - let eventLoop = eventLoop ?? self.eventLoopGroup.any() - return try await self.execute(request: request, timeout: timeout, on: eventLoop, logger: logger).get() - } - - func execute( - request: AWSHTTPRequest, - timeout: TimeAmount, - on eventLoop: EventLoop? = nil, + on eventLoop: EventLoop, logger: Logger, stream: @escaping AWSResponseStream - ) async throws -> AWSHTTPResponse { - let eventLoop = eventLoop ?? self.eventLoopGroup.any() - return try await self.execute(request: request, timeout: timeout, on: eventLoop, logger: logger, stream: stream).get() + ) async throws -> HTTPClientResponse { + preconditionFailure("Not supported") } } diff --git a/Sources/SotoCore/HTTP/ResponseDelegate.swift b/Sources/SotoCore/HTTP/ResponseDelegate.swift deleted file mode 100644 index ff71f901e..000000000 --- a/Sources/SotoCore/HTTP/ResponseDelegate.swift +++ /dev/null @@ -1,95 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Soto for AWS open source project -// -// Copyright (c) 2017-2020 the Soto project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Soto project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import AsyncHTTPClient -import Foundation -import NIOCore -import NIOHTTP1 - -/// HTTP client delegate capturing the body parts received from AsyncHTTPClient. -class AWSHTTPClientResponseDelegate: HTTPClientResponseDelegate { - typealias Response = AWSHTTPResponse - - enum State { - case idle - case head(HTTPResponseHead) - case end - case error(Error) - } - - let host: String - let stream: (ByteBuffer, EventLoop) -> EventLoopFuture - var state: State - - init(host: String, stream: @escaping (ByteBuffer, EventLoop) -> EventLoopFuture) { - self.host = host - self.stream = stream - self.state = .idle - } - - func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - switch self.state { - case .idle: - self.state = .head(head) - case .head: - preconditionFailure("head already set") - case .end: - preconditionFailure("request already processed") - case .error: - break - } - return task.eventLoop.makeSucceededFuture(()) - } - - func didReceiveBodyPart(task: HTTPClient.Task, _ part: ByteBuffer) -> EventLoopFuture { - switch self.state { - case .idle: - preconditionFailure("no head received before body") - case .head(let head): - if (200..<300).contains(head.status.code) { - let futureResult = self.stream(part, task.eventLoop) - return futureResult - } - self.state = .head(head) - case .end: - preconditionFailure("request already processed") - case .error: - break - } - return task.eventLoop.makeSucceededFuture(()) - } - - func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.state = .error(error) - } - - func didFinishRequest(task: HTTPClient.Task) throws -> AWSHTTPResponse { - switch self.state { - case .idle: - preconditionFailure("no head received before end") - case .head(let head): - return AsyncHTTPClient.HTTPClient.Response( - host: self.host, - status: head.status, - version: .init(major: 1, minor: 1), - headers: head.headers, - body: nil - ) - case .end: - preconditionFailure("request already processed") - case .error(let error): - throw error - } - } -} diff --git a/Sources/SotoCore/Message/AWSResponse.swift b/Sources/SotoCore/Message/AWSResponse.swift index 1462e1f73..638244013 100644 --- a/Sources/SotoCore/Message/AWSResponse.swift +++ b/Sources/SotoCore/Message/AWSResponse.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import AsyncHTTPClient import class Foundation.JSONDecoder import class Foundation.JSONSerialization import Logging @@ -33,26 +34,25 @@ public struct AWSResponse { /// - from: Raw HTTP Response /// - serviceProtocol: protocol of service (.json, .xml, .query etc) /// - raw: Whether Body should be treated as raw data - init(from response: AWSHTTPResponse, serviceProtocol: ServiceProtocol, raw: Bool = false) throws { + init(from response: AWSHTTPResponse, serviceProtocol: ServiceProtocol, raw: Bool = false) async throws { self.status = response.status // headers self.headers = response.headers // body - guard let body = response.body, - body.readableBytes > 0 - else { + let buffer = try await response.body.collect(upTo: .max) + if buffer.readableBytes == 0 { self.body = .empty return } if raw { - self.body = .raw(.byteBuffer(body)) + self.body = .raw(.byteBuffer(buffer)) return } - if body.readableBytes == 0 { + if buffer.readableBytes == 0 { self.body = .empty return } @@ -61,14 +61,13 @@ public struct AWSResponse { switch serviceProtocol { case .json, .restjson: - responseBody = .json(body) + responseBody = .json(buffer) case .restxml, .query, .ec2: - if let xmlString = body.getString(at: body.readerIndex, length: body.readableBytes) { - let xmlDocument = try XML.Document(string: xmlString) - if let element = xmlDocument.rootElement() { - responseBody = .xml(element) - } + let xmlString = String(buffer: buffer) + let xmlDocument = try XML.Document(string: xmlString) + if let element = xmlDocument.rootElement() { + responseBody = .xml(element) } } self.body = responseBody diff --git a/Tests/SotoCoreTests/AWSClientTests.swift b/Tests/SotoCoreTests/AWSClientTests.swift index a00861513..78db6a160 100644 --- a/Tests/SotoCoreTests/AWSClientTests.swift +++ b/Tests/SotoCoreTests/AWSClientTests.swift @@ -261,217 +261,217 @@ class AWSClientTests: XCTestCase { try await responseTask } - func testRequestStreaming() async throws { - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try awsServer.stop()) - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 16 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 17 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 18 * 1024, blockSize: 47 * 1024) - } - - func testRequestS3Streaming() async throws { - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(service: "s3", endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .static(accessKeyId: "foo", secretAccessKey: "bar"), httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try awsServer.stop()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 16 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 192 * 1024, blockSize: 128 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 81 * 1024, blockSize: 16 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: S3ChunkedStreamReader.bufferSize) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 130 * 1024, blockSize: S3ChunkedStreamReader.bufferSize) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 17 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 68 * 1024, blockSize: 67 * 1024) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 65537, blockSize: 65537) - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 65552, blockSize: 65552) - } - - func testRequestStreamingAvoidStackOverflow() async throws { - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(service: "s3", endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .static(accessKeyId: "foo", secretAccessKey: "bar"), httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try awsServer.stop()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - - try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 16 * 1024, blockSize: 8) - } - - func testRequestStreamingWithPayload(_ payload: AWSPayload) async throws { - struct Input: AWSEncodableShape & AWSShapeWithPayload { - static var _payloadPath: String = "payload" - static var _options: AWSShapeOptions = [.allowStreaming] - let payload: AWSPayload - private enum CodingKeys: CodingKey {} - } - - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - // ignore error - try? awsServer.stop() - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - let input = Input(payload: payload) - async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) - try await responseTask - } - - func testRequestStreamingTooMuchData() async throws { - // set up stream of 8 bytes but supply more than that - let payload = AWSPayload.stream(size: 8) { eventLoop in - var buffer = ByteBufferAllocator().buffer(capacity: 0) - buffer.writeString("String longer than 8 bytes") - return eventLoop.makeSucceededFuture(.byteBuffer(buffer)) - } - do { - try await self.testRequestStreamingWithPayload(payload) - XCTFail("Should not get here") - } catch { - XCTAssertEqual(error as? AWSClient.ClientError, .tooMuchData) - } - } - - func testRequestStreamingNotEnoughData() async throws { - var byteBuffer = ByteBufferAllocator().buffer(staticString: "Buffer") - let payload = AWSPayload.stream(size: byteBuffer.readableBytes + 1) { eventLoop in - let size = byteBuffer.readableBytes - if size == 0 { - return eventLoop.makeSucceededFuture(.end) - } - let buffer = byteBuffer.readSlice(length: size)! - return eventLoop.makeSucceededFuture(.byteBuffer(buffer)) - } - do { - try await self.testRequestStreamingWithPayload(payload) - XCTFail("Should not get here") - } catch { - XCTAssertEqual(error as? AWSClient.ClientError, .notEnoughData) - } - } - - func testRequestStreamingFile() async throws { - struct Input: AWSEncodableShape & AWSShapeWithPayload { - static var _payloadPath: String = "payload" - static var _options: AWSShapeOptions = [.allowStreaming] - let payload: AWSPayload - private enum CodingKeys: CodingKey {} - } - - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try awsServer.stop()) - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - do { - let bufferSize = 208 * 1024 - let data = Data(createRandomBuffer(45, 9182, size: bufferSize)) - let filename = "testRequestStreamingFile" - let fileURL = URL(fileURLWithPath: filename) - try data.write(to: fileURL) - defer { - XCTAssertNoThrow(try FileManager.default.removeItem(at: fileURL)) - } - - let threadPool = NIOThreadPool(numberOfThreads: 3) - threadPool.start() - let fileIO = NonBlockingFileIO(threadPool: threadPool) - let fileHandle = try await fileIO.openFile(path: filename, mode: .read, eventLoop: httpClient.eventLoopGroup.next()).get() - defer { - XCTAssertNoThrow(try fileHandle.close()) - XCTAssertNoThrow(try threadPool.syncShutdownGracefully()) - } - - let input = Input(payload: .fileHandle(fileHandle, size: bufferSize, fileIO: fileIO) { size in print(size) }) - async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) - - try awsServer.processRaw { request in - XCTAssertNil(request.headers["transfer-encoding"]) - XCTAssertEqual(request.headers["Content-Length"], bufferSize.description) - let requestData = request.body.getData(at: 0, length: request.body.readableBytes) - XCTAssertEqual(requestData, data) - let response = AWSTestServer.Response(httpStatus: .ok, headers: [:], body: nil) - return .result(response) - } - - try await responseTask - } catch let error as AWSClient.ClientError where error == .tooMuchData { - } catch { - XCTFail("Unexpected error: \(error)") - } - } - - func testRequestChunkedStreaming() async throws { - struct Input: AWSEncodableShape & AWSShapeWithPayload { - static var _payloadPath: String = "payload" - static var _options: AWSShapeOptions = [.allowStreaming, .allowChunkedStreaming, .rawPayload] - let payload: AWSPayload - private enum CodingKeys: CodingKey {} - } - - let awsServer = AWSTestServer(serviceProtocol: .json) - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try awsServer.stop()) - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - } - do { - // supply buffer in 16k blocks - let bufferSize = 145 * 1024 - let blockSize = 16 * 1024 - let data = createRandomBuffer(45, 9182, size: bufferSize) - var byteBuffer = ByteBufferAllocator().buffer(capacity: bufferSize) - byteBuffer.writeBytes(data) - - let payload = AWSPayload.stream { eventLoop in - let size = min(blockSize, byteBuffer.readableBytes) - if size == 0 { - return eventLoop.makeSucceededFuture(.end) - } else { - return eventLoop.makeSucceededFuture(.byteBuffer(byteBuffer.readSlice(length: size)!)) - } - } - let input = Input(payload: payload) - async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) - - try awsServer.processRaw { request in - let bytes = request.body.getBytes(at: 0, length: request.body.readableBytes) - XCTAssertTrue(bytes == data) - let response = AWSTestServer.Response(httpStatus: .ok, headers: [:], body: nil) - return .result(response) - } - - try await responseTask - } catch { - XCTFail("Unexpected error: \(error)") - } - } + /* func testRequestStreaming() async throws { + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try awsServer.stop()) + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 16 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 17 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 18 * 1024, blockSize: 47 * 1024) + } + + func testRequestS3Streaming() async throws { + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(service: "s3", endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .static(accessKeyId: "foo", secretAccessKey: "bar"), httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try awsServer.stop()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 16 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 192 * 1024, blockSize: 128 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 81 * 1024, blockSize: 16 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: S3ChunkedStreamReader.bufferSize) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 130 * 1024, blockSize: S3ChunkedStreamReader.bufferSize) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 128 * 1024, blockSize: 17 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 68 * 1024, blockSize: 67 * 1024) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 65537, blockSize: 65537) + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 65552, blockSize: 65552) + } + + func testRequestStreamingAvoidStackOverflow() async throws { + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(service: "s3", endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .static(accessKeyId: "foo", secretAccessKey: "bar"), httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try awsServer.stop()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + + try await self.testRequestStreaming(config: config, client: client, server: awsServer, bufferSize: 16 * 1024, blockSize: 8) + } + + func testRequestStreamingWithPayload(_ payload: AWSPayload) async throws { + struct Input: AWSEncodableShape & AWSShapeWithPayload { + static var _payloadPath: String = "payload" + static var _options: AWSShapeOptions = [.allowStreaming] + let payload: AWSPayload + private enum CodingKeys: CodingKey {} + } + + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + // ignore error + try? awsServer.stop() + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + let input = Input(payload: payload) + async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) + try await responseTask + } + + func testRequestStreamingTooMuchData() async throws { + // set up stream of 8 bytes but supply more than that + let payload = AWSPayload.stream(size: 8) { eventLoop in + var buffer = ByteBufferAllocator().buffer(capacity: 0) + buffer.writeString("String longer than 8 bytes") + return eventLoop.makeSucceededFuture(.byteBuffer(buffer)) + } + do { + try await self.testRequestStreamingWithPayload(payload) + XCTFail("Should not get here") + } catch { + XCTAssertEqual(error as? AWSClient.ClientError, .tooMuchData) + } + } + + func testRequestStreamingNotEnoughData() async throws { + var byteBuffer = ByteBufferAllocator().buffer(staticString: "Buffer") + let payload = AWSPayload.stream(size: byteBuffer.readableBytes + 1) { eventLoop in + let size = byteBuffer.readableBytes + if size == 0 { + return eventLoop.makeSucceededFuture(.end) + } + let buffer = byteBuffer.readSlice(length: size)! + return eventLoop.makeSucceededFuture(.byteBuffer(buffer)) + } + do { + try await self.testRequestStreamingWithPayload(payload) + XCTFail("Should not get here") + } catch { + XCTAssertEqual(error as? AWSClient.ClientError, .notEnoughData) + } + } + + func testRequestStreamingFile() async throws { + struct Input: AWSEncodableShape & AWSShapeWithPayload { + static var _payloadPath: String = "payload" + static var _options: AWSShapeOptions = [.allowStreaming] + let payload: AWSPayload + private enum CodingKeys: CodingKey {} + } + + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try awsServer.stop()) + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + do { + let bufferSize = 208 * 1024 + let data = Data(createRandomBuffer(45, 9182, size: bufferSize)) + let filename = "testRequestStreamingFile" + let fileURL = URL(fileURLWithPath: filename) + try data.write(to: fileURL) + defer { + XCTAssertNoThrow(try FileManager.default.removeItem(at: fileURL)) + } + + let threadPool = NIOThreadPool(numberOfThreads: 3) + threadPool.start() + let fileIO = NonBlockingFileIO(threadPool: threadPool) + let fileHandle = try await fileIO.openFile(path: filename, mode: .read, eventLoop: httpClient.eventLoopGroup.next()).get() + defer { + XCTAssertNoThrow(try fileHandle.close()) + XCTAssertNoThrow(try threadPool.syncShutdownGracefully()) + } + + let input = Input(payload: .fileHandle(fileHandle, size: bufferSize, fileIO: fileIO) { size in print(size) }) + async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) + + try awsServer.processRaw { request in + XCTAssertNil(request.headers["transfer-encoding"]) + XCTAssertEqual(request.headers["Content-Length"], bufferSize.description) + let requestData = request.body.getData(at: 0, length: request.body.readableBytes) + XCTAssertEqual(requestData, data) + let response = AWSTestServer.Response(httpStatus: .ok, headers: [:], body: nil) + return .result(response) + } + + try await responseTask + } catch let error as AWSClient.ClientError where error == .tooMuchData { + } catch { + XCTFail("Unexpected error: \(error)") + } + } + + func testRequestChunkedStreaming() async throws { + struct Input: AWSEncodableShape & AWSShapeWithPayload { + static var _payloadPath: String = "payload" + static var _options: AWSShapeOptions = [.allowStreaming, .allowChunkedStreaming, .rawPayload] + let payload: AWSPayload + private enum CodingKeys: CodingKey {} + } + + let awsServer = AWSTestServer(serviceProtocol: .json) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try awsServer.stop()) + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + } + do { + // supply buffer in 16k blocks + let bufferSize = 145 * 1024 + let blockSize = 16 * 1024 + let data = createRandomBuffer(45, 9182, size: bufferSize) + var byteBuffer = ByteBufferAllocator().buffer(capacity: bufferSize) + byteBuffer.writeBytes(data) + + let payload = AWSPayload.stream { eventLoop in + let size = min(blockSize, byteBuffer.readableBytes) + if size == 0 { + return eventLoop.makeSucceededFuture(.end) + } else { + return eventLoop.makeSucceededFuture(.byteBuffer(byteBuffer.readSlice(length: size)!)) + } + } + let input = Input(payload: payload) + async let responseTask: Void = client.execute(operation: "test", path: "/", httpMethod: .POST, serviceConfig: config, input: input, logger: TestEnvironment.logger) + + try awsServer.processRaw { request in + let bytes = request.body.getBytes(at: 0, length: request.body.readableBytes) + XCTAssertTrue(bytes == data) + let response = AWSTestServer.Response(httpStatus: .ok, headers: [:], body: nil) + return .result(response) + } + + try await responseTask + } catch { + XCTFail("Unexpected error: \(error)") + } + }*/ func testProvideHTTPClient() async { do { @@ -746,101 +746,101 @@ class AWSClientTests: XCTestCase { } } - func testStreamingResponse() async { - struct Input: AWSEncodableShape {} - struct Output: AWSDecodableShape & Encodable { - static let _encoding = [AWSMemberEncoding(label: "test", location: .header("test"))] - let test: String - } - let data = createRandomBuffer(45, 109, size: 128 * 1024) - - do { - let awsServer = AWSTestServer(serviceProtocol: .json) - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5) - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) - XCTAssertNoThrow(try awsServer.stop()) - } - let countAtomic = ManagedAtomic(0) - async let responseTask: Output = client.execute( - operation: "test", - path: "/", - httpMethod: .GET, - serviceConfig: config, - input: Input(), - logger: TestEnvironment.logger - ) { (payload: ByteBuffer, eventLoop: EventLoop) in - let payloadSize = payload.readableBytes - let count = countAtomic.loadThenWrappingIncrement(by: payloadSize, ordering: .relaxed) - let slice = Data(data[count..<(count + payloadSize)]) - let payloadData = payload.getData(at: 0, length: payload.readableBytes) - XCTAssertEqual(slice, payloadData) - return eventLoop.makeSucceededFuture(()) - } - - try awsServer.processRaw { _ in - var byteBuffer = ByteBufferAllocator().buffer(capacity: 128 * 1024) - byteBuffer.writeBytes(data) - let response = AWSTestServer.Response(httpStatus: .ok, headers: ["test": "TestHeader"], body: byteBuffer) - return .result(response) - } - - let result = try await responseTask - XCTAssertEqual(result.test, "TestHeader") - XCTAssertEqual(countAtomic.load(ordering: .relaxed), 128 * 1024) - } catch { - XCTFail("Unexpected error: \(error)") - } - } - - func testStreamingDelegateFinished() async throws { - struct Input: AWSEncodableShape {} - struct Output: AWSDecodableShape & Encodable { - static let _encoding = [AWSMemberEncoding(label: "test", location: .header("test"))] - let test: String - } - let bufferSize = 200 * 1024 - let data = createRandomBuffer(45, 109, size: bufferSize) - - let awsServer = AWSTestServer(serviceProtocol: .json) - let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5) - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) - let config = createServiceConfig(endpoint: awsServer.address) - let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) - defer { - XCTAssertNoThrow(try client.syncShutdown()) - XCTAssertNoThrow(try httpClient.syncShutdown()) - XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) - XCTAssertNoThrow(try awsServer.stop()) - } - let countAtomic = ManagedAtomic(0) - async let responseTask: Output = client.execute( - operation: "test", - path: "/", - httpMethod: .GET, - serviceConfig: config, - input: Input(), - logger: TestEnvironment.logger - ) { (_: ByteBuffer, eventLoop: EventLoop) in - countAtomic.wrappingIncrement(by: 1, ordering: .relaxed) - return eventLoop.scheduleTask(in: .milliseconds(200)) { - countAtomic.wrappingDecrement(by: 1, ordering: .relaxed) - }.futureResult - } - - XCTAssertNoThrow(try awsServer.processRaw { _ in - var byteBuffer = ByteBufferAllocator().buffer(capacity: bufferSize) - byteBuffer.writeBytes(data) - let response = AWSTestServer.Response(httpStatus: .ok, headers: ["test": "TestHeader"], body: byteBuffer) - return .result(response) - }) - - _ = try await responseTask - XCTAssertEqual(countAtomic.load(ordering: .relaxed), 0) - } + /* func testStreamingResponse() async { + struct Input: AWSEncodableShape {} + struct Output: AWSDecodableShape & Encodable { + static let _encoding = [AWSMemberEncoding(label: "test", location: .header("test"))] + let test: String + } + let data = createRandomBuffer(45, 109, size: 128 * 1024) + + do { + let awsServer = AWSTestServer(serviceProtocol: .json) + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5) + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + XCTAssertNoThrow(try awsServer.stop()) + } + let countAtomic = ManagedAtomic(0) + async let responseTask: Output = client.execute( + operation: "test", + path: "/", + httpMethod: .GET, + serviceConfig: config, + input: Input(), + logger: TestEnvironment.logger + ) { (payload: ByteBuffer, eventLoop: EventLoop) in + let payloadSize = payload.readableBytes + let count = countAtomic.loadThenWrappingIncrement(by: payloadSize, ordering: .relaxed) + let slice = Data(data[count..<(count + payloadSize)]) + let payloadData = payload.getData(at: 0, length: payload.readableBytes) + XCTAssertEqual(slice, payloadData) + return eventLoop.makeSucceededFuture(()) + } + + try awsServer.processRaw { _ in + var byteBuffer = ByteBufferAllocator().buffer(capacity: 128 * 1024) + byteBuffer.writeBytes(data) + let response = AWSTestServer.Response(httpStatus: .ok, headers: ["test": "TestHeader"], body: byteBuffer) + return .result(response) + } + + let result = try await responseTask + XCTAssertEqual(result.test, "TestHeader") + XCTAssertEqual(countAtomic.load(ordering: .relaxed), 128 * 1024) + } catch { + XCTFail("Unexpected error: \(error)") + } + } + + func testStreamingDelegateFinished() async throws { + struct Input: AWSEncodableShape {} + struct Output: AWSDecodableShape & Encodable { + static let _encoding = [AWSMemberEncoding(label: "test", location: .header("test"))] + let test: String + } + let bufferSize = 200 * 1024 + let data = createRandomBuffer(45, 109, size: bufferSize) + + let awsServer = AWSTestServer(serviceProtocol: .json) + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 5) + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + let config = createServiceConfig(endpoint: awsServer.address) + let client = createAWSClient(credentialProvider: .empty, httpClientProvider: .shared(httpClient)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown()) + XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) + XCTAssertNoThrow(try awsServer.stop()) + } + let countAtomic = ManagedAtomic(0) + async let responseTask: Output = client.execute( + operation: "test", + path: "/", + httpMethod: .GET, + serviceConfig: config, + input: Input(), + logger: TestEnvironment.logger + ) { (_: ByteBuffer, eventLoop: EventLoop) in + countAtomic.wrappingIncrement(by: 1, ordering: .relaxed) + return eventLoop.scheduleTask(in: .milliseconds(200)) { + countAtomic.wrappingDecrement(by: 1, ordering: .relaxed) + }.futureResult + } + + XCTAssertNoThrow(try awsServer.processRaw { _ in + var byteBuffer = ByteBufferAllocator().buffer(capacity: bufferSize) + byteBuffer.writeBytes(data) + let response = AWSTestServer.Response(httpStatus: .ok, headers: ["test": "TestHeader"], body: byteBuffer) + return .result(response) + }) + + _ = try await responseTask + XCTAssertEqual(countAtomic.load(ordering: .relaxed), 0) + } */ } diff --git a/Tests/SotoCoreTests/AWSResponseTests.swift b/Tests/SotoCoreTests/AWSResponseTests.swift index ff2f7d653..dafac968e 100644 --- a/Tests/SotoCoreTests/AWSResponseTests.swift +++ b/Tests/SotoCoreTests/AWSResponseTests.swift @@ -20,7 +20,7 @@ import SotoXML import XCTest class AWSResponseTests: XCTestCase { - func testHeaderResponseDecoding() { + func testHeaderResponseDecoding() async throws { struct Output: AWSDecodableShape { static let _encoding = [AWSMemberEncoding(label: "h", location: .header("header-member"))] let h: String @@ -28,28 +28,25 @@ class AWSResponseTests: XCTestCase { case h = "header-member" } } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, - headers: ["header-member": "test-header"], - bodyData: nil + headers: ["header-member": "test-header"] ) - // XML - var awsXMLResponse: AWSResponse? + // XML var awsXMLResponse: awsResponse var xmlResult: Output? - XCTAssertNoThrow(awsXMLResponse = try AWSResponse(from: response, serviceProtocol: .query, raw: false)) - XCTAssertNoThrow(xmlResult = try awsXMLResponse?.generateOutputShape(operation: "Test")) + let awsXMLResponse = try await AWSResponse(from: response, serviceProtocol: .query, raw: false) + XCTAssertNoThrow(xmlResult = try awsXMLResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(xmlResult?.h, "test-header") // JSON - var awsJSONResponse: AWSResponse? var jsonResult: Output? - XCTAssertNoThrow(awsJSONResponse = try AWSResponse(from: response, serviceProtocol: .restjson, raw: false)) - XCTAssertNoThrow(jsonResult = try awsJSONResponse?.generateOutputShape(operation: "Test")) + let awsJSONResponse = try await AWSResponse(from: response, serviceProtocol: .restjson, raw: false) + XCTAssertNoThrow(jsonResult = try awsJSONResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(jsonResult?.h, "test-header") } - func testHeaderResponseTypeDecoding() { + func testHeaderResponseTypeDecoding() async throws { struct Output: AWSDecodableShape { static let _encoding = [ AWSMemberEncoding(label: "string", location: .header("string")), @@ -64,7 +61,7 @@ class AWSResponseTests: XCTestCase { let integer: Int let bool: Bool } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: [ "string": "test-header", @@ -75,11 +72,10 @@ class AWSResponseTests: XCTestCase { ] ) - // JSON - var awsJSONResponse: AWSResponse? + // JSON var awsJSONResponse: awsResponse var jsonResult: Output? - XCTAssertNoThrow(awsJSONResponse = try AWSResponse(from: response, serviceProtocol: .restjson, raw: false)) - XCTAssertNoThrow(jsonResult = try awsJSONResponse?.generateOutputShape(operation: "Test")) + let awsJSONResponse = try await AWSResponse(from: response, serviceProtocol: .restjson, raw: false) + XCTAssertNoThrow(jsonResult = try awsJSONResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(jsonResult?.string, "test-header") XCTAssertEqual(jsonResult?.string2, "23") XCTAssertEqual(jsonResult?.double, 3.14) @@ -87,53 +83,49 @@ class AWSResponseTests: XCTestCase { XCTAssertEqual(jsonResult?.bool, false) } - func testStatusCodeResponseDecoding() { + func testStatusCodeResponseDecoding() async throws { struct Output: AWSDecodableShape { static let _encoding = [AWSMemberEncoding(label: "status", location: .statusCode)] let status: Int } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, - headers: HTTPHeaders(), - bodyData: nil + headers: HTTPHeaders() ) // XML - var awsXMLResponse: AWSResponse? var xmlResult: Output? - XCTAssertNoThrow(awsXMLResponse = try AWSResponse(from: response, serviceProtocol: .query, raw: false)) - XCTAssertNoThrow(xmlResult = try awsXMLResponse?.generateOutputShape(operation: "Test")) + let awsXMLResponse = try await AWSResponse(from: response, serviceProtocol: .query, raw: false) + XCTAssertNoThrow(xmlResult = try awsXMLResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(xmlResult?.status, 200) // JSON - var awsJSONResponse: AWSResponse? var jsonResult: Output? - XCTAssertNoThrow(awsJSONResponse = try AWSResponse(from: response, serviceProtocol: .restjson, raw: false)) - XCTAssertNoThrow(jsonResult = try awsJSONResponse?.generateOutputShape(operation: "Test")) + let awsJSONResponse = try await AWSResponse(from: response, serviceProtocol: .restjson, raw: false) + XCTAssertNoThrow(jsonResult = try awsJSONResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(jsonResult?.status, 200) } // MARK: XML tests - func testValidateXMLResponse() { + func testValidateXMLResponse() async throws { struct Output: AWSDecodableShape { let name: String } let responseBody = "hello" - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: HTTPHeaders(), - bodyData: Data(responseBody.utf8) + body: .init(ByteBuffer(string: responseBody)) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml, raw: false)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml, raw: false) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.name, "hello") } - func testValidateXMLCodablePayloadResponse() { + func testValidateXMLCodablePayloadResponse() async throws { struct Output: AWSDecodableShape & AWSShapeWithPayload { static let _encoding = [AWSMemberEncoding(label: "contentType", location: .header("content-type"))] static let _payloadPath: String = "name" @@ -145,59 +137,56 @@ class AWSResponseTests: XCTestCase { case contentType = "content-type" } } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: ["Content-Type": "application/xml"], - bodyData: "hello".data(using: .utf8)! + body: .init(ByteBuffer(string: "hello")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml, raw: false)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml, raw: false) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.name, "hello") XCTAssertEqual(output?.contentType, "application/xml") } - func testValidateXMLRawPayloadResponse() { + func testValidateXMLRawPayloadResponse() async throws { struct Output: AWSDecodableShape, AWSShapeWithPayload { static let _payloadPath: String = "body" static let _options: AWSShapeOptions = .rawPayload let body: AWSPayload } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: HTTPHeaders(), - bodyData: Data("{\"name\":\"hello\"}".utf8) + body: .init(ByteBuffer(string: "{\"name\":\"hello\"}")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml, raw: true)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) - XCTAssertEqual(output?.body.asData(), Data("{\"name\":\"hello\"}".utf8)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml, raw: true) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) + XCTAssertEqual(output?.body.asString(), "{\"name\":\"hello\"}") } // MARK: JSON tests - func testValidateJSONResponse() { + func testValidateJSONResponse() async throws { struct Output: AWSDecodableShape { let name: String } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: HTTPHeaders(), - bodyData: Data("{\"name\":\"hello\"}".utf8) + body: .init(ByteBuffer(string: "{\"name\":\"hello\"}")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.name, "hello") } - func testValidateJSONCodablePayloadResponse() { + func testValidateJSONCodablePayloadResponse() async throws { struct Output2: AWSDecodableShape { let name: String } @@ -205,20 +194,19 @@ class AWSResponseTests: XCTestCase { static let _payloadPath: String = "output2" let output2: Output2 } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: HTTPHeaders(), - bodyData: Data("{\"name\":\"hello\"}".utf8) + body: .init(ByteBuffer(string: "{\"name\":\"hello\"}")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.output2.name, "hello") } - func testValidateJSONRawPayloadResponse() { + func testValidateJSONRawPayloadResponse() async throws { struct Output: AWSDecodableShape, AWSShapeWithPayload { static let _payloadPath: String = "body" static let _options: AWSShapeOptions = .rawPayload @@ -227,154 +215,147 @@ class AWSResponseTests: XCTestCase { ] let body: AWSPayload } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: ["Content-Type": "application/json"], - bodyData: Data("{\"name\":\"hello\"}".utf8) + body: .init(ByteBuffer(string: "{\"name\":\"hello\"}")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: true)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: true) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.body.asString(), "{\"name\":\"hello\"}") } // MARK: Error tests - func testJSONError() { - let response = AWSHTTPResponseImpl( + func testJSONError() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: "{\"__type\":\"ResourceNotFoundException\", \"message\": \"Donald Where's Your Troosers?\"}".data(using: .utf8)! + body: .init(ByteBuffer(string: "{\"__type\":\"ResourceNotFoundException\", \"message\": \"Donald Where's Your Troosers?\"}")) ) let service = createServiceConfig(serviceProtocol: .json(version: "1.1"), errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.resourceNotFoundException) XCTAssertEqual(error?.message, "Donald Where's Your Troosers?") XCTAssertEqual(error?.context?.responseCode, .notFound) } - func testJSONErrorV2() { - let response = AWSHTTPResponseImpl( + func testJSONErrorV2() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: #"{"__type":"ResourceNotFoundException", "Message": "Donald Where's Your Troosers?", "fault": "client","CancellationReasons":1}"#.data(using: .utf8)! + body: .init(ByteBuffer(string: + #"{"__type":"ResourceNotFoundException", "Message": "Donald Where's Your Troosers?", "fault": "client","CancellationReasons":1}"# + )) ) let service = createServiceConfig(serviceProtocol: .json(version: "1.1"), errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.resourceNotFoundException) XCTAssertEqual(error?.message, "Donald Where's Your Troosers?") XCTAssertEqual(error?.context?.responseCode, .notFound) XCTAssertEqual(error?.context?.additionalFields["fault"], "client") } - func testRestJSONError() { - let response = AWSHTTPResponseImpl( + func testRestJSONError() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: ["x-amzn-errortype": "ResourceNotFoundException"], - bodyData: Data(#"{"message": "Donald Where's Your Troosers?", "Fault": "Client"}"#.utf8) + body: .init(ByteBuffer(string: #"{"message": "Donald Where's Your Troosers?", "Fault": "Client"}"#)) ) let service = createServiceConfig(serviceProtocol: .restjson, errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restjson, raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restjson, raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.resourceNotFoundException) XCTAssertEqual(error?.message, "Donald Where's Your Troosers?") XCTAssertEqual(error?.context?.responseCode, .notFound) XCTAssertEqual(error?.context?.additionalFields["Fault"], "Client") } - func testRestJSONErrorV2() { + func testRestJSONErrorV2() async throws { // Capitalized "Message" - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .notFound, headers: ["x-amzn-errortype": "ResourceNotFoundException"], - bodyData: Data(#"{"Message": "Donald Where's Your Troosers?"}"#.utf8) + body: .init(ByteBuffer(string: #"{"Message": "Donald Where's Your Troosers?"}"#)) ) let service = createServiceConfig(serviceProtocol: .restjson, errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restjson, raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restjson, raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.resourceNotFoundException) XCTAssertEqual(error?.message, "Donald Where's Your Troosers?") XCTAssertEqual(error?.context?.responseCode, .notFound) } - func testXMLError() { - let response = AWSHTTPResponseImpl( + func testXMLError() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: "NoSuchKeyIt doesn't existclient".data(using: .utf8)! + body: .init(ByteBuffer(string: "NoSuchKeyIt doesn't existclient")) ) let service = createServiceConfig(serviceProtocol: .restxml, errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml, raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml, raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.noSuchKey) XCTAssertEqual(error?.message, "It doesn't exist") XCTAssertEqual(error?.context?.responseCode, .notFound) XCTAssertEqual(error?.context?.additionalFields["fault"], "client") } - func testQueryError() { - let response = AWSHTTPResponseImpl( + func testQueryError() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: "MessageRejectedDon't like itclient".data(using: .utf8)! + body: .init(ByteBuffer(string: "MessageRejectedDon't like itclient")) ) let queryService = createServiceConfig(serviceProtocol: .query, errorType: ServiceErrorType.self) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .query, raw: false)) - let error = awsResponse?.generateError(serviceConfig: queryService, logger: TestEnvironment.logger) as? ServiceErrorType + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .query, raw: false) + let error = awsResponse.generateError(serviceConfig: queryService, logger: TestEnvironment.logger) as? ServiceErrorType XCTAssertEqual(error, ServiceErrorType.messageRejected) XCTAssertEqual(error?.message, "Don't like it") XCTAssertEqual(error?.context?.responseCode, .notFound) XCTAssertEqual(error?.context?.additionalFields["fault"], "client") } - func testEC2Error() { - let response = AWSHTTPResponseImpl( + func testEC2Error() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: "NoSuchKeyIt doesn't existclient".data(using: .utf8)! + body: .init(ByteBuffer(string: "NoSuchKeyIt doesn't existclient")) ) let service = createServiceConfig(serviceProtocol: .ec2) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .ec2, raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? AWSResponseError + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .ec2, raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? AWSResponseError XCTAssertEqual(error?.errorCode, "NoSuchKey") XCTAssertEqual(error?.message, "It doesn't exist") XCTAssertEqual(error?.context?.responseCode, .notFound) XCTAssertEqual(error?.context?.additionalFields["fault"], "client") } - func testAdditionalErrorFields() { - let response = AWSHTTPResponseImpl( + func testAdditionalErrorFields() async throws { + let response = AWSHTTPResponse( status: .notFound, headers: HTTPHeaders(), - bodyData: "NoSuchKeyIt doesn't existclient".data(using: .utf8)! + body: .init(ByteBuffer(string: "NoSuchKeyIt doesn't existclient")) ) let service = createServiceConfig(serviceProtocol: .restxml) - var awsResponse: AWSResponse? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .ec2, raw: false)) - let error = awsResponse?.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? AWSResponseError + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .ec2, raw: false) + let error = awsResponse.generateError(serviceConfig: service, logger: TestEnvironment.logger) as? AWSResponseError XCTAssertEqual(error?.context?.additionalFields["fault"], "client") } - func testHeaderPrefixFromDictionary() { + func testHeaderPrefixFromDictionary() async throws { struct Output: AWSDecodableShape { static let _encoding: [AWSMemberEncoding] = [ .init(label: "content", location: .headerPrefix("prefix-")), @@ -384,19 +365,18 @@ class AWSResponseTests: XCTestCase { case content = "prefix-" } } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: ["prefix-one": "first", "prefix-two": "second"] ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.content["one"], "first") XCTAssertEqual(output?.content["two"], "second") } - func testHeaderPrefixFromXML() { + func testHeaderPrefixFromXML() async throws { struct Output: AWSDecodableShape { static let _encoding: [AWSMemberEncoding] = [ .init(label: "content", location: .headerPrefix("prefix-")), @@ -408,22 +388,21 @@ class AWSResponseTests: XCTestCase { case content = "prefix-" } } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: ["prefix-one": "first", "prefix-two": "second"], - bodyData: Data("Hello".utf8) + body: .init(ByteBuffer(string: "Hello")) ) - var awsResponse: AWSResponse? var output: Output? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.content["one"], "first") XCTAssertEqual(output?.content["two"], "second") } // MARK: Miscellaneous tests - func testProcessHAL() { + func testProcessHAL() async throws { struct Output: AWSDecodableShape { let s: String let i: Int @@ -433,16 +412,15 @@ class AWSResponseTests: XCTestCase { let d: Double let b: Bool } - let response = AWSHTTPResponseImpl( + let response = AWSHTTPResponse( status: .ok, headers: ["Content-Type": "application/hal+json"], - bodyData: Data(#"{"_embedded": {"a": [{"s":"Hello", "i":1234}, {"s":"Hello2", "i":12345}]}, "d":3.14, "b":true}"#.utf8) + body: .init(ByteBuffer(string: #"{"_embedded": {"a": [{"s":"Hello", "i":1234}, {"s":"Hello2", "i":12345}]}, "d":3.14, "b":true}"#)) ) - var awsResponse: AWSResponse? var output: Output2? - XCTAssertNoThrow(awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false)) - XCTAssertNoThrow(output = try awsResponse?.generateOutputShape(operation: "Test")) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1"), raw: false) + XCTAssertNoThrow(output = try awsResponse.generateOutputShape(operation: "Test")) XCTAssertEqual(output?.a.count, 2) XCTAssertEqual(output?.d, 3.14) XCTAssertEqual(output?.a[1].s, "Hello2") @@ -450,27 +428,6 @@ class AWSResponseTests: XCTestCase { // MARK: Types used in tests - struct AWSHTTPResponseImpl: AWSHTTPResponse { - let status: HTTPResponseStatus - let headers: HTTPHeaders - let body: ByteBuffer? - - init(status: HTTPResponseStatus, headers: HTTPHeaders, body: ByteBuffer? = nil) { - self.status = status - self.headers = headers - self.body = body - } - - init(status: HTTPResponseStatus, headers: HTTPHeaders, bodyData: Data?) { - var body: ByteBuffer? - if let bodyData = bodyData { - body = ByteBufferAllocator().buffer(capacity: bodyData.count) - body?.writeBytes(bodyData) - } - self.init(status: status, headers: headers, body: body) - } - } - struct ServiceErrorType: AWSErrorType, Equatable { enum Code: String { case resourceNotFoundException = "ResourceNotFoundException" diff --git a/Tests/SotoCoreTests/TimeStampTests.swift b/Tests/SotoCoreTests/TimeStampTests.swift index 14b498c53..82a5e0a17 100644 --- a/Tests/SotoCoreTests/TimeStampTests.swift +++ b/Tests/SotoCoreTests/TimeStampTests.swift @@ -36,14 +36,14 @@ class TimeStampTests: XCTestCase { let date: Date } - func testDecodeJSON() { + func testDecodeJSON() async throws { do { struct A: AWSDecodableShape { let date: Date } - let byteBuffer = ByteBufferAllocator().buffer(string: "{\"date\": 234876345}") - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .json(version: "1.1")) + let byteBuffer = ByteBuffer(string: "{\"date\": 234876345}") + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .json(version: "1.1")) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(a.date.timeIntervalSince1970, 234_876_345) } catch { @@ -51,7 +51,7 @@ class TimeStampTests: XCTestCase { } } - func testEncodeJSON() { + func testEncodeJSON() async throws { struct A: AWSEncodableShape { var date: Date } @@ -65,15 +65,15 @@ class TimeStampTests: XCTestCase { #endif } - func testDecodeXML() { + func testDecodeXML() async throws { do { struct A: AWSDecodableShape { let date: Date let date2: Date } - let byteBuffer = ByteBufferAllocator().buffer(string: "2017-01-01T00:01:00.000Z2017-01-01T00:02:00Z") - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let byteBuffer = ByteBuffer(string: "2017-01-01T00:01:00.000Z2017-01-01T00:02:00Z") + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "2017-01-01T00:01:00.000Z") XCTAssertEqual(self.dateFormatter.string(from: a.date2), "2017-01-01T00:02:00.000Z") @@ -82,7 +82,7 @@ class TimeStampTests: XCTestCase { } } - func testEncodeXML() { + func testEncodeXML() async throws { struct A: AWSEncodableShape { var date: Date } @@ -92,7 +92,7 @@ class TimeStampTests: XCTestCase { XCTAssertEqual(request?.body.asString(), "2017-11-01T00:15:00.000Z") } - func testEncodeQuery() { + func testEncodeQuery() async throws { struct A: AWSEncodableShape { var date: Date } @@ -102,7 +102,7 @@ class TimeStampTests: XCTestCase { XCTAssertEqual(request?.body.asString(), "Action=test&Version=01-01-2001&date=2017-11-01T00%3A15%3A00.000Z") } - func testDecodeHeader() { + func testDecodeHeader() async throws { do { struct A: AWSDecodableShape { static let _encoding = [AWSMemberEncoding(label: "date", location: .header("Date"))] @@ -111,8 +111,8 @@ class TimeStampTests: XCTestCase { case date = "Date" } } - let response = AWSHTTPResponseImpl(status: .ok, headers: ["Date": "Tue, 15 Nov 1994 12:45:27 GMT"], body: nil) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let response = AWSHTTPResponse(status: .ok, headers: ["Date": "Tue, 15 Nov 1994 12:45:27 GMT"]) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "1994-11-15T12:45:27.000Z") } catch { @@ -120,14 +120,14 @@ class TimeStampTests: XCTestCase { } } - func testDecodeISOFromXML() { + func testDecodeISOFromXML() async throws { do { struct A: AWSDecodableShape { @CustomCoding var date: Date } - let byteBuffer = ByteBufferAllocator().buffer(string: "2017-01-01T00:01:00.000Z") - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let byteBuffer = ByteBuffer(string: "2017-01-01T00:01:00.000Z") + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "2017-01-01T00:01:00.000Z") } catch { @@ -135,14 +135,14 @@ class TimeStampTests: XCTestCase { } } - func testDecodeISONoMillisecondFromXML() { + func testDecodeISONoMillisecondFromXML() async throws { do { struct A: AWSDecodableShape { @CustomCoding var date: Date } - let byteBuffer = ByteBufferAllocator().buffer(string: "2017-01-01T00:01:00Z") - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let byteBuffer = ByteBuffer(string: "2017-01-01T00:01:00Z") + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "2017-01-01T00:01:00.000Z") } catch { @@ -150,15 +150,15 @@ class TimeStampTests: XCTestCase { } } - func testDecodeHttpFormattedTimestamp() { + func testDecodeHttpFormattedTimestamp() async throws { do { struct A: AWSDecodableShape { @CustomCoding var date: Date } let xml = "Tue, 15 Nov 1994 12:45:26 GMT" - let byteBuffer = ByteBufferAllocator().buffer(string: xml) - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let byteBuffer = ByteBuffer(string: xml) + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "1994-11-15T12:45:26.000Z") } catch { @@ -166,15 +166,15 @@ class TimeStampTests: XCTestCase { } } - func testDecodeUnixEpochTimestamp() { + func testDecodeUnixEpochTimestamp() async throws { do { struct A: AWSDecodableShape { @CustomCoding var date: Date } let xml = "1221382800" - let byteBuffer = ByteBufferAllocator().buffer(string: xml) - let response = AWSHTTPResponseImpl(status: .ok, headers: [:], body: byteBuffer) - let awsResponse = try AWSResponse(from: response, serviceProtocol: .restxml) + let byteBuffer = ByteBuffer(string: xml) + let response = AWSHTTPResponse(status: .ok, headers: [:], body: .init(byteBuffer)) + let awsResponse = try await AWSResponse(from: response, serviceProtocol: .restxml) let a: A = try awsResponse.generateOutputShape(operation: "TestOperation") XCTAssertEqual(self.dateFormatter.string(from: a.date), "2008-09-14T09:00:00.000Z") } catch { @@ -182,7 +182,7 @@ class TimeStampTests: XCTestCase { } } - func testEncodeISO8601ToXML() { + func testEncodeISO8601ToXML() async throws { struct A: AWSEncodableShape { @CustomCoding var date: Date } @@ -192,7 +192,7 @@ class TimeStampTests: XCTestCase { XCTAssertEqual(request?.body.asString(), "2019-05-01T00:00:00.001Z") } - func testEncodeHTTPHeaderToJSON() { + func testEncodeHTTPHeaderToJSON() async throws { struct A: AWSEncodableShape { @CustomCoding var date: Date } @@ -202,7 +202,7 @@ class TimeStampTests: XCTestCase { XCTAssertEqual(request?.body.asString(), "{\"date\":\"Wed, 1 May 2019 00:00:00 GMT\"}") } - func testEncodeUnixEpochToJSON() { + func testEncodeUnixEpochToJSON() async throws { struct A: AWSEncodableShape { @CustomCoding var date: Date } @@ -211,27 +211,4 @@ class TimeStampTests: XCTestCase { XCTAssertNoThrow(request = try AWSRequest(operation: "test", path: "/", httpMethod: .GET, input: a, configuration: createServiceConfig())) XCTAssertEqual(request?.body.asString(), "{\"date\":23983978378}") } - - // MARK: Types used in tests - - struct AWSHTTPResponseImpl: AWSHTTPResponse { - let status: HTTPResponseStatus - let headers: HTTPHeaders - let body: ByteBuffer? - - init(status: HTTPResponseStatus, headers: HTTPHeaders, body: ByteBuffer?) { - self.status = status - self.headers = headers - self.body = body - } - - init(status: HTTPResponseStatus, headers: HTTPHeaders, bodyData: Data?) { - var body: ByteBuffer? - if let bodyData = bodyData { - body = ByteBufferAllocator().buffer(capacity: bodyData.count) - body?.writeBytes(bodyData) - } - self.init(status: status, headers: headers, body: body) - } - } }