From 05f3409af0b395f7937ae2dd6ab6daec5f0a37bd Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Mon, 18 May 2020 20:49:11 +0100 Subject: [PATCH 1/7] draft for streaming el fixes --- Sources/AsyncHTTPClient/HTTPHandler.swift | 30 +++++++++++++++---- .../HTTPClientTests.swift | 22 ++++++++++++-- 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 6adb55342..f9df03ba2 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -765,12 +765,32 @@ extension TaskHandler: ChannelDuplexHandler { return context.eventLoop.makeSucceededFuture(()) } - return body.stream(HTTPClient.Body.StreamWriter { part in - context.eventLoop.assertInEventLoop() - return context.writeAndFlush(self.wrapOutboundOut(.body(part))).map { - self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) + func doIt() -> EventLoopFuture { + body.stream(HTTPClient.Body.StreamWriter { part in + self.task.eventLoop.assertInEventLoop() + func doIt() -> EventLoopFuture { + return context.writeAndFlush(self.wrapOutboundOut(.body(part))).map { + self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) + } + } + + if context.eventLoop.inEventLoop { + return doIt().hop(to: self.task.eventLoop) + } else { + return context.eventLoop.flatSubmit { + doIt() + }.hop(to: self.task.eventLoop) + } + }) + } + + if self.task.eventLoop.inEventLoop { + return doIt() + } else { + return self.task.eventLoop.flatSubmit { + doIt() } - }) + } } public func read(context: ChannelHandlerContext) { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 7155d8011..16e9c7eea 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -447,22 +447,40 @@ class HTTPClientTests: XCTestCase { } func testUploadStreaming() throws { + let group = getDefaultEventLoopGroup(numberOfThreads: 4) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group)) defer { XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) XCTAssertNoThrow(try httpBin.shutdown()) } + let el1 = group.next() + let el2 = group.next() + XCTAssertFalse(el1 === el2) + let body: HTTPClient.Body = .stream(length: 8) { writer in + XCTAssert(el1.inEventLoop) let buffer = ByteBuffer.of(string: "1234") return writer.write(.byteBuffer(buffer)).flatMap { + XCTAssert(el1.inEventLoop) let buffer = ByteBuffer.of(string: "4321") return writer.write(.byteBuffer(buffer)) } } - let response = try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait() + do { + // Pre-populate pool with a connection on a different EL + let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET) + XCTAssertNoThrow(try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegateAndChannel(on: el2)).wait()) + } + + let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body) + let response = try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegate(on: el1)).wait() let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) } let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!) From 8766d499bc1487fecaf06fb338773fbf4ba52ae7 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Mon, 18 May 2020 20:54:48 +0100 Subject: [PATCH 2/7] fix compilation --- Sources/AsyncHTTPClient/HTTPHandler.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index f9df03ba2..3763336f6 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -766,7 +766,7 @@ extension TaskHandler: ChannelDuplexHandler { } func doIt() -> EventLoopFuture { - body.stream(HTTPClient.Body.StreamWriter { part in + return body.stream(HTTPClient.Body.StreamWriter { part in self.task.eventLoop.assertInEventLoop() func doIt() -> EventLoopFuture { return context.writeAndFlush(self.wrapOutboundOut(.body(part))).map { From b4bb0765007b55e35df3a0c00b7ba12887d8de48 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Wed, 20 May 2020 12:38:00 +0100 Subject: [PATCH 3/7] remove hack --- Sources/AsyncHTTPClient/HTTPHandler.swift | 20 +++++++++---------- .../HTTPClientTests.swift | 16 +++------------ 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index b9b58eeb0..bb6779a91 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -767,23 +767,23 @@ extension TaskHandler: ChannelDuplexHandler { func doIt() -> EventLoopFuture { return body.stream(HTTPClient.Body.StreamWriter { part in - self.task.eventLoop.assertInEventLoop() - func doIt() -> EventLoopFuture { - return context.writeAndFlush(self.wrapOutboundOut(.body(part))).map { - self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) + let promise = self.task.eventLoop.makePromise(of: Void.self) + // All writes have to be switched to the channel EL if channel and task ELs differ + if context.eventLoop.inEventLoop { + context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise) + } else { + context.eventLoop.execute { + context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise) } } - if context.eventLoop.inEventLoop { - return doIt().hop(to: self.task.eventLoop) - } else { - return context.eventLoop.flatSubmit { - doIt() - }.hop(to: self.task.eventLoop) + return promise.futureResult.map { + self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) } }) } + // Callout to the user to start body streaming should be on task EL if self.task.eventLoop.inEventLoop { return doIt() } else { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index cb21e365c..a377c8b4d 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1771,10 +1771,7 @@ class HTTPClientTests: XCTestCase { let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) } let endPromise = group.next().makePromise(of: Void.self) let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self) - // Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible - // hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the - // StreamWriter. - let streamWriterPromise = group.next().makePromise(of: (EventLoop, HTTPClient.Body.StreamWriter).self) + let streamWriterPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter.self) func makeServer() -> Channel? { return try? ServerBootstrap(group: group) @@ -1799,12 +1796,7 @@ class HTTPClientTests: XCTestCase { method: .POST, headers: ["transfer-encoding": "chunked"], body: .stream { streamWriter in - // Due to https://github.com/swift-server/async-http-client/issues/200 - // we also need to pull off a terrible hack and get the internal - // EventLoop out :(. Once the bug is fixed, this promise should only get - // the StreamWriter. - let currentEL = MultiThreadedEventLoopGroup.currentEventLoop! // HACK!! - streamWriterPromise.succeed((currentEL, streamWriter)) + streamWriterPromise.succeed((streamWriter)) return sentOffAllBodyPartsPromise.futureResult }) } @@ -1829,9 +1821,7 @@ class HTTPClientTests: XCTestCase { buffer.clear() buffer.writeString(String(bodyChunkNumber, radix: 16)) XCTAssertEqual(1, buffer.readableBytes) - XCTAssertNoThrow(try streamWriter.0.flatSubmit { - streamWriter.1.write(.byteBuffer(buffer)) - }.wait()) + XCTAssertNoThrow(try streamWriter.write(.byteBuffer(buffer)).wait()) XCTAssertNoThrow(XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait())) } sentOffAllBodyPartsPromise.succeed(()) From b4abced6b0a714c94d6777f76dd519268f208c59 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Wed, 20 May 2020 12:39:41 +0100 Subject: [PATCH 4/7] fix formatting --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index a377c8b4d..e84487aa0 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1796,7 +1796,7 @@ class HTTPClientTests: XCTestCase { method: .POST, headers: ["transfer-encoding": "chunked"], body: .stream { streamWriter in - streamWriterPromise.succeed((streamWriter)) + streamWriterPromise.succeed(streamWriter) return sentOffAllBodyPartsPromise.futureResult }) } From 075c76873f33f87238773e560ee1dbe2961ac362 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Wed, 20 May 2020 14:02:05 +0100 Subject: [PATCH 5/7] split tests --- .../HTTPClientTests.swift | 80 ++++++++++++++----- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index e84487aa0..56f7559e3 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -447,40 +447,22 @@ class HTTPClientTests: XCTestCase { } func testUploadStreaming() throws { - let group = getDefaultEventLoopGroup(numberOfThreads: 4) - defer { - XCTAssertNoThrow(try group.syncShutdownGracefully()) - } - let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group)) + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) defer { XCTAssertNoThrow(try httpClient.syncShutdown()) XCTAssertNoThrow(try httpBin.shutdown()) } - let el1 = group.next() - let el2 = group.next() - XCTAssertFalse(el1 === el2) - let body: HTTPClient.Body = .stream(length: 8) { writer in - XCTAssert(el1.inEventLoop) let buffer = ByteBuffer.of(string: "1234") return writer.write(.byteBuffer(buffer)).flatMap { - XCTAssert(el1.inEventLoop) let buffer = ByteBuffer.of(string: "4321") return writer.write(.byteBuffer(buffer)) } } - do { - // Pre-populate pool with a connection on a different EL - let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET) - XCTAssertNoThrow(try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegateAndChannel(on: el2)).wait()) - } - - let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body) - let response = try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegate(on: el1)).wait() + let response = try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait() let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) } let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!) @@ -1828,4 +1810,62 @@ class HTTPClientTests: XCTestCase { XCTAssertNoThrow(try endPromise.futureResult.wait()) XCTAssertNoThrow(try runningRequest.wait()) } + + func testUploadStreamingCallinToleratedFromOtsideEL() throws { + let httpBin = HTTPBin() + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup)) + defer { + XCTAssertNoThrow(try httpClient.syncShutdown()) + XCTAssertNoThrow(try httpBin.shutdown()) + } + + let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .POST, body: .stream(length: 4) { writer in + let promise = httpClient.eventLoopGroup.next().makePromise(of: Void.self) + // We have to toleare callins from any thread + DispatchQueue(label: "upload-streaming").async { + writer.write(.byteBuffer(ByteBuffer.of(string: "1234"))).whenComplete { _ in + promise.succeed(()) + } + } + return promise.futureResult + }) + XCTAssertNoThrow(try httpClient.execute(request: request).wait()) + } + + func testUploadStreamingIsCalledOnTaskEL() throws { + let group = getDefaultEventLoopGroup(numberOfThreads: 4) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let httpBin = HTTPBin() + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group)) + defer { + XCTAssertNoThrow(try httpClient.syncShutdown()) + XCTAssertNoThrow(try httpBin.shutdown()) + } + + let el1 = group.next() + let el2 = group.next() + XCTAssertFalse(el1 === el2) + + do { + // Pre-populate pool with a connection on a different EL + let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET) + XCTAssertNoThrow(try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegateAndChannel(on: el2)).wait()) + } + + let body: HTTPClient.Body = .stream(length: 8) { writer in + XCTAssert(el1.inEventLoop) + let buffer = ByteBuffer.of(string: "1234") + return writer.write(.byteBuffer(buffer)).flatMap { + XCTAssert(el1.inEventLoop) + let buffer = ByteBuffer.of(string: "4321") + return writer.write(.byteBuffer(buffer)) + } + } + let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body) + let response = httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegate(on: el1)) + XCTAssertNoThrow(try response.wait()) + } } From 1cd8dd0df341e6217d180ede8e43edb0526d1e73 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Wed, 20 May 2020 14:06:34 +0100 Subject: [PATCH 6/7] fix test server shutdown --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 56f7559e3..0d045d551 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1705,6 +1705,7 @@ class HTTPClientTests: XCTestCase { private let bodyPromises: [EventLoopPromise] private let endPromise: EventLoopPromise private var bodyPartsSeenSoFar = 0 + private var atEnd = false init(headPromise: EventLoopPromise, bodyPromises: [EventLoopPromise], @@ -1727,10 +1728,14 @@ class HTTPClientTests: XCTestCase { context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil) context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: self.endPromise) + self.atEnd = true } } func handlerRemoved(context: ChannelHandlerContext) { + guard !self.atEnd else { + return + } struct NotFulfilledError: Error {} self.headPromise.fail(NotFulfilledError()) From 435c03e381533b2b3e14febdd02662dbd6f27429 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Wed, 20 May 2020 14:11:16 +0100 Subject: [PATCH 7/7] fix linuxtests --- Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 7dcaaf200..8cb1f5b7f 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -98,6 +98,8 @@ extension HTTPClientTests { ("testAsyncShutdown", testAsyncShutdown), ("testValidationErrorsAreSurfaced", testValidationErrorsAreSurfaced), ("testUploadsReallyStream", testUploadsReallyStream), + ("testUploadStreamingCallinToleratedFromOtsideEL", testUploadStreamingCallinToleratedFromOtsideEL), + ("testUploadStreamingIsCalledOnTaskEL", testUploadStreamingIsCalledOnTaskEL), ] } }