22//
33// This source file is part of the SwiftAWSLambdaRuntime open source project
44//
5- // Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
5+ // Copyright (c) 2017-2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
66// Licensed under Apache License v2.0
77//
88// See LICENSE.txt for license information
@@ -103,8 +103,7 @@ internal final class HTTPClient {
103103 // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104104 try channel. pipeline. syncOperations. addHandler (
105105 NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106- try channel. pipeline. syncOperations. addHandler (
107- UnaryHandler ( keepAlive: self . configuration. keepAlive) )
106+ try channel. pipeline. syncOperations. addHandler ( UnaryHandler ( ) )
108107 return channel. eventLoop. makeSucceededFuture ( ( ) )
109108 } catch {
110109 return channel. eventLoop. makeFailedFuture ( error)
@@ -139,10 +138,10 @@ internal final class HTTPClient {
139138 }
140139
141140 internal struct Response : Equatable {
142- public var version : HTTPVersion
143- public var status : HTTPResponseStatus
144- public var headers : HTTPHeaders
145- public var body : ByteBuffer ?
141+ var version : HTTPVersion
142+ var status : HTTPResponseStatus
143+ var headers : HTTPHeaders
144+ var body : ByteBuffer ?
146145 }
147146
148147 internal enum Errors : Error {
@@ -163,26 +162,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
163162 typealias OutboundIn = HTTPRequestWrapper
164163 typealias OutboundOut = HTTPClientRequestPart
165164
166- private let keepAlive : Bool
165+ enum State {
166+ case idle
167+ case running( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? )
168+ case waitForConnectionClose( HTTPClient . Response , EventLoopPromise < HTTPClient . Response > )
169+ }
167170
168- private var pending : ( promise : EventLoopPromise < HTTPClient . Response > , timeout : Scheduled < Void > ? ) ?
171+ private var state : State = . idle
169172 private var lastError : Error ?
170173
171- init ( keepAlive: Bool ) {
172- self . keepAlive = keepAlive
173- }
174+ init ( ) { }
174175
175176 func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
176- guard self . pending == nil else {
177+ guard case . idle = self . state else {
177178 preconditionFailure ( " invalid state, outstanding request " )
178179 }
179180 let wrapper = unwrapOutboundIn ( data)
180-
181+
181182 var head = HTTPRequestHead (
182183 version: . http1_1,
183184 method: wrapper. request. method,
184185 uri: wrapper. request. url,
185- headers: wrapper. request. headers)
186+ headers: wrapper. request. headers
187+ )
186188 head. headers. add ( name: " host " , value: wrapper. request. targetHost)
187189 switch head. method {
188190 case . POST, . PUT:
@@ -191,29 +193,17 @@ private final class UnaryHandler: ChannelDuplexHandler {
191193 break
192194 }
193195
194- // We don't add a "Connection" header here if we want to keep the connection open,
195- // HTTP/1.1 specified in RFC 7230, Section 6.3 Persistence:
196- //
197- // HTTP/1.1 defaults to the use of "persistent connections", allowing
198- // multiple requests and responses to be carried over a single
199- // connection. The "close" connection option is used to signal that a
200- // connection will not persist after the current request/response. HTTP
201- // implementations SHOULD support persistent connections.
202- //
203- // See also UnaryHandler.channelRead below.
204- if !self . keepAlive {
205- head. headers. add ( name: " connection " , value: " close " )
206- }
207-
208196 let timeoutTask = wrapper. request. timeout. map {
209197 context. eventLoop. scheduleTask ( in: $0) {
210- if self . pending != nil {
211- context . pipeline . fireErrorCaught ( HTTPClient . Errors . timeout )
198+ guard case . running = self . state else {
199+ preconditionFailure ( " invalid state " )
212200 }
201+
202+ context. pipeline. fireErrorCaught ( HTTPClient . Errors. timeout)
213203 }
214204 }
215- self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
216-
205+ self . state = . running ( promise: wrapper. promise, timeout: timeoutTask)
206+
217207 context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
218208 if let body = wrapper. request. body {
219209 context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
@@ -222,20 +212,21 @@ private final class UnaryHandler: ChannelDuplexHandler {
222212 }
223213
224214 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
225- guard let pending = self . pending else {
215+ guard case . running ( let promise , let timeout ) = self . state else {
226216 preconditionFailure ( " invalid state, no pending request " )
227217 }
228-
218+
229219 let response = unwrapInboundIn ( data)
230-
220+
231221 let httpResponse = HTTPClient . Response (
232222 version: response. head. version,
233223 status: response. head. status,
234224 headers: response. head. headers,
235- body: response. body)
236-
237- self . completeWith ( . success( httpResponse) )
238-
225+ body: response. body
226+ )
227+
228+ timeout? . cancel ( )
229+
239230 // As defined in RFC 7230 Section 6.3:
240231 // HTTP/1.1 defaults to the use of "persistent connections", allowing
241232 // multiple requests and responses to be carried over a single
@@ -248,10 +239,15 @@ private final class UnaryHandler: ChannelDuplexHandler {
248239 let serverCloseConnection =
249240 response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
250241
251- if !self . keepAlive || serverCloseConnection || response. head. version != . http1_1 {
252- pending. promise. futureResult. whenComplete { _ in
253- _ = context. channel. close ( )
254- }
242+ let closeConnection = serverCloseConnection || response. head. version != . http1_1
243+
244+ if closeConnection {
245+ self . state = . waitForConnectionClose( httpResponse, promise)
246+ _ = context. channel. close ( )
247+ return
248+ } else {
249+ self . state = . idle
250+ promise. succeed ( httpResponse)
255251 }
256252 }
257253
@@ -263,36 +259,44 @@ private final class UnaryHandler: ChannelDuplexHandler {
263259
264260 func channelInactive( context: ChannelHandlerContext ) {
265261 // fail any pending responses with last error or assume peer disconnected
266- if self . pending != nil {
267- let error = self . lastError ?? HTTPClient . Errors. connectionResetByPeer
268- self . completeWith ( . failure( error) )
269- }
270262 context. fireChannelInactive ( )
263+
264+ switch self . state {
265+ case . idle:
266+ break
267+ case . running( let promise, let timeout) :
268+ self . state = . idle
269+ timeout? . cancel ( )
270+ promise. fail ( self . lastError ?? HTTPClient . Errors. connectionResetByPeer)
271+
272+ case . waitForConnectionClose( let response, let promise) :
273+ self . state = . idle
274+ promise. succeed ( response)
275+ }
271276 }
272277
273278 func triggerUserOutboundEvent( context: ChannelHandlerContext , event: Any , promise: EventLoopPromise < Void > ? ) {
274279 switch event {
275280 case is RequestCancelEvent :
276- if self . pending != nil {
277- self . completeWith ( . failure( HTTPClient . Errors. cancelled) )
281+ switch self . state {
282+ case . idle:
283+ break
284+ case . running( let promise, let timeout) :
285+ self . state = . idle
286+ timeout? . cancel ( )
287+ promise. fail ( HTTPClient . Errors. cancelled)
288+
278289 // after the cancel error has been send, we want to close the connection so
279290 // that no more packets can be read on this connection.
280291 _ = context. channel. close ( )
292+ case . waitForConnectionClose( _, let promise) :
293+ self . state = . idle
294+ promise. fail ( HTTPClient . Errors. cancelled)
281295 }
282296 default :
283297 context. triggerUserOutboundEvent ( event, promise: promise)
284298 }
285299 }
286-
287- private func completeWith( _ result: Result < HTTPClient . Response , Error > ) {
288- guard let pending = self . pending else {
289- preconditionFailure ( " invalid state, no pending request " )
290- }
291- self . pending = nil
292- self . lastError = nil
293- pending. timeout? . cancel ( )
294- pending. promise. completeWith ( result)
295- }
296300}
297301
298302private struct HTTPRequestWrapper {
0 commit comments