diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift index ecff7afc7..f0aff762c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift @@ -412,6 +412,7 @@ extension HTTP1ConnectionStateMachine.State { self = .closing newFinalAction = .close case .sendRequestEnd(let writePromise): + self = .idle newFinalAction = .sendRequestEnd(writePromise) case .none: self = .idle diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 63a1cf540..f2cc7b1d8 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -1267,7 +1267,11 @@ final class HTTP200DelayedHandler: ChannelInboundHandler { let request = self.unwrapInboundIn(data) switch request { case .head: - break + // Once we have received one response, all further requests are responded to immediately. + if self.pendingBodyParts == nil { + context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } case .body: if let pendingBodyParts = self.pendingBodyParts { if pendingBodyParts > 0 { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index d6fe77e47..a709cf2d6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -130,6 +130,7 @@ extension HTTPClientTests { ("testWeCloseConnectionsWhenConnectionCloseSetByServer", testWeCloseConnectionsWhenConnectionCloseSetByServer), ("testBiDirectionalStreaming", testBiDirectionalStreaming), ("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200), + ("testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests", testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests), ("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting), ("testFileDownloadChunked", testFileDownloadChunked), ("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 726809415..e5d935fb9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -3021,6 +3021,60 @@ class HTTPClientTests: XCTestCase { XCTAssertNil(try delegate.next().wait()) } + // This test is identical to the one above, except that we send another request immediately after. This is a regression + // test for https://github.com/swift-server/async-http-client/issues/595. + func testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests() { + let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTP200DelayedHandler(bodyPartsBeforeResponse: 1) } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let writeEL = eventLoopGroup.next() + + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } + + let body: HTTPClient.Body = .stream { writer in + let finalPromise = writeEL.makePromise(of: Void.self) + + func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + // always invoke from the wrong el to test thread safety + writeEL.preconditionInEventLoop() + + if index >= 30 { + return finalPromise.succeed(()) + } + + let sent = ByteBuffer(integer: index) + writer.write(.byteBuffer(sent)).whenComplete { result in + switch result { + case .success: + writeEL.execute { + writeLoop(writer, index: index + 1) + } + + case .failure(let error): + finalPromise.fail(error) + } + } + } + + writeEL.execute { + writeLoop(writer, index: 0) + } + + return finalPromise.futureResult + } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) + let future = httpClient.execute(request: request) + XCTAssertNoThrow(try future.wait()) + + // Try another request + let future2 = httpClient.execute(request: request) + XCTAssertNoThrow(try future2.wait()) + } + func testSynchronousHandshakeErrorReporting() throws { // This only affects cases where we use NIOSSL. guard !isTestingNIOTS() else { return }