@@ -97,9 +97,17 @@ internal final class HTTPClient {
9797 private func connect( ) -> EventLoopFuture < Channel > {
9898 let bootstrap = ClientBootstrap ( group: self . eventLoop)
9999 . channelInitializer { channel in
100- channel. pipeline. addHTTPClientHandlers ( ) . flatMap {
101- channel. pipeline. addHandlers ( [ HTTPHandler ( keepAlive: self . configuration. keepAlive) ,
102- UnaryHandler ( keepAlive: self . configuration. keepAlive) ] )
100+ do {
101+ try channel. pipeline. syncOperations. addHTTPClientHandlers ( )
102+ // Lambda quotas... An invocation payload is maximal 6MB in size:
103+ // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html
104+ try channel. pipeline. syncOperations. addHandler (
105+ NIOHTTPClientResponseAggregator ( maxContentLength: 6 * 1024 * 1024 ) )
106+ try channel. pipeline. syncOperations. addHandler (
107+ UnaryHandler ( keepAlive: self . configuration. keepAlive) )
108+ return channel. eventLoop. makeSucceededFuture ( ( ) )
109+ } catch {
110+ return channel. eventLoop. makeFailedFuture ( error)
103111 }
104112 }
105113
@@ -149,116 +157,54 @@ internal final class HTTPClient {
149157 }
150158}
151159
152- private final class HTTPHandler : ChannelDuplexHandler {
153- typealias OutboundIn = HTTPClient . Request
154- typealias InboundOut = HTTPClient . Response
155- typealias InboundIn = HTTPClientResponsePart
160+ // no need in locks since we validate only one request can run at a time
161+ private final class UnaryHandler : ChannelDuplexHandler {
162+ typealias InboundIn = NIOHTTPClientResponseFull
163+ typealias OutboundIn = HTTPRequestWrapper
156164 typealias OutboundOut = HTTPClientRequestPart
157165
158166 private let keepAlive : Bool
159- private var readState : ReadState = . idle
167+
168+ private var pending : ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) ?
169+ private var lastError : Error ?
160170
161171 init ( keepAlive: Bool ) {
162172 self . keepAlive = keepAlive
163173 }
164174
165175 func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
166- let request = unwrapOutboundIn ( data)
167-
168- var head = HTTPRequestHead ( version: . init( major: 1 , minor: 1 ) , method: request. method, uri: request. url, headers: request. headers)
169- head. headers. add ( name: " host " , value: request. targetHost)
170- switch request. method {
176+ guard self . pending == nil else {
177+ preconditionFailure ( " invalid state, outstanding request " )
178+ }
179+ let wrapper = unwrapOutboundIn ( data)
180+
181+ var head = HTTPRequestHead (
182+ version: . http1_1,
183+ method: wrapper. request. method,
184+ uri: wrapper. request. url,
185+ headers: wrapper. request. headers)
186+ head. headers. add ( name: " host " , value: wrapper. request. targetHost)
187+ switch head. method {
171188 case . POST, . PUT:
172- head. headers. add ( name: " content-length " , value: String ( request. body? . readableBytes ?? 0 ) )
189+ head. headers. add ( name: " content-length " , value: String ( wrapper . request. body? . readableBytes ?? 0 ) )
173190 default :
174191 break
175192 }
176193
177194 // We don't add a "Connection" header here if we want to keep the connection open,
178- // HTTP/1.1 defines specifies the following in RFC 2616 , Section 8.1.2.1 :
195+ // HTTP/1.1 specified in RFC 7230 , Section 6.3 Persistence :
179196 //
180- // An HTTP/1.1 server MAY assume that a HTTP/1.1 client intends to
181- // maintain a persistent connection unless a Connection header including
182- // the connection-token "close" was sent in the request. If the server
183- // chooses to close the connection immediately after sending the
184- // response, it SHOULD send a Connection header including the
185- // connection-token close.
197+ // HTTP/1.1 defaults to the use of "persistent connections", allowing
198+ // multiple requests and responses to be carried over a single
199+ // connection. The "close" connection option is used to signal that a
200+ // connection will not persist after the current request/response. HTTP
201+ // implementations SHOULD support persistent connections.
186202 //
187203 // See also UnaryHandler.channelRead below.
188204 if !self . keepAlive {
189205 head. headers. add ( name: " connection " , value: " close " )
190206 }
191207
192- context. write ( self . wrapOutboundOut ( HTTPClientRequestPart . head ( head) ) ) . flatMap { _ -> EventLoopFuture < Void > in
193- if let body = request. body {
194- return context. writeAndFlush ( self . wrapOutboundOut ( HTTPClientRequestPart . body ( . byteBuffer( body) ) ) )
195- } else {
196- context. flush ( )
197- return context. eventLoop. makeSucceededFuture ( ( ) )
198- }
199- } . cascade ( to: promise)
200- }
201-
202- func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
203- let response = unwrapInboundIn ( data)
204-
205- switch response {
206- case . head( let head) :
207- guard case . idle = self . readState else {
208- preconditionFailure ( " invalid read state \( self . readState) " )
209- }
210- self . readState = . head( head)
211- case . body( var bodyPart) :
212- switch self . readState {
213- case . head( let head) :
214- self . readState = . body( head, bodyPart)
215- case . body( let head, var body) :
216- body. writeBuffer ( & bodyPart)
217- self . readState = . body( head, body)
218- default :
219- preconditionFailure ( " invalid read state \( self . readState) " )
220- }
221- case . end:
222- switch self . readState {
223- case . head( let head) :
224- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: nil ) ) )
225- self . readState = . idle
226- case . body( let head, let body) :
227- context. fireChannelRead ( wrapInboundOut ( HTTPClient . Response ( version: head. version, status: head. status, headers: head. headers, body: body) ) )
228- self . readState = . idle
229- default :
230- preconditionFailure ( " invalid read state \( self . readState) " )
231- }
232- }
233- }
234-
235- private enum ReadState {
236- case idle
237- case head( HTTPResponseHead )
238- case body( HTTPResponseHead , ByteBuffer )
239- }
240- }
241-
242- // no need in locks since we validate only one request can run at a time
243- private final class UnaryHandler : ChannelDuplexHandler {
244- typealias OutboundIn = HTTPRequestWrapper
245- typealias InboundIn = HTTPClient . Response
246- typealias OutboundOut = HTTPClient . Request
247-
248- private let keepAlive : Bool
249-
250- private var pending : ( promise: EventLoopPromise < HTTPClient . Response > , timeout: Scheduled < Void > ? ) ?
251- private var lastError : Error ?
252-
253- init ( keepAlive: Bool ) {
254- self . keepAlive = keepAlive
255- }
256-
257- func write( context: ChannelHandlerContext , data: NIOAny , promise: EventLoopPromise < Void > ? ) {
258- guard self . pending == nil else {
259- preconditionFailure ( " invalid state, outstanding request " )
260- }
261- let wrapper = unwrapOutboundIn ( data)
262208 let timeoutTask = wrapper. request. timeout. map {
263209 context. eventLoop. scheduleTask ( in: $0) {
264210 if self . pending != nil {
@@ -267,15 +213,29 @@ private final class UnaryHandler: ChannelDuplexHandler {
267213 }
268214 }
269215 self . pending = ( promise: wrapper. promise, timeout: timeoutTask)
270- context. writeAndFlush ( wrapOutboundOut ( wrapper. request) , promise: promise)
216+
217+ context. write ( wrapOutboundOut ( . head( head) ) , promise: nil )
218+ if let body = wrapper. request. body {
219+ context. write ( wrapOutboundOut ( . body( IOData . byteBuffer ( body) ) ) , promise: nil )
220+ }
221+ context. writeAndFlush ( wrapOutboundOut ( . end( nil ) ) , promise: promise)
271222 }
272223
273224 func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
274- let response = unwrapInboundIn ( data)
275225 guard let pending = self . pending else {
276226 preconditionFailure ( " invalid state, no pending request " )
277227 }
278-
228+
229+ let response = unwrapInboundIn ( data)
230+
231+ let httpResponse = HTTPClient . Response (
232+ version: response. head. version,
233+ status: response. head. status,
234+ headers: response. head. headers,
235+ body: response. body)
236+
237+ self . completeWith ( . success( httpResponse) )
238+
279239 // As defined in RFC 7230 Section 6.3:
280240 // HTTP/1.1 defaults to the use of "persistent connections", allowing
281241 // multiple requests and responses to be carried over a single
@@ -285,14 +245,14 @@ private final class UnaryHandler: ChannelDuplexHandler {
285245 //
286246 // That's why we only assume the connection shall be closed if we receive
287247 // a "connection = close" header.
288- let serverCloseConnection = response. headers. first ( name: " connection " ) ? . lowercased ( ) == " close "
248+ let serverCloseConnection =
249+ response. head. headers [ " connection " ] . contains ( where: { $0. lowercased ( ) == " close " } )
289250
290- if !self . keepAlive || serverCloseConnection || response. version != . init ( major : 1 , minor : 1 ) {
251+ if !self . keepAlive || serverCloseConnection || response. head . version != . http1_1 {
291252 pending. promise. futureResult. whenComplete { _ in
292253 _ = context. channel. close ( )
293254 }
294255 }
295- self . completeWith ( . success( response) )
296256 }
297257
298258 func errorCaught( context: ChannelHandlerContext , error: Error ) {
0 commit comments