Skip to content

Commit

Permalink
Synchronous connection failures must close channels.
Browse files Browse the repository at this point in the history
Motivation:

When a connect() call returns an error other than EINPROGRESS, we
currently leave the FD registered and don't close the channel. This is
wrong, we should take this as a signal to close the socket immediately,
rathern than letting the selector tell us this FD is dead: after all,
we know it's dead.

Modifications:

Call `close0()` in synchronous connect failures.

Result:

Faster failures!
  • Loading branch information
Lukasa committed Apr 18, 2018
1 parent 9326f74 commit 57e9460
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 33 deletions.
6 changes: 4 additions & 2 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

do {
assert(self.lifecycleManager.isRegistered)
if try !connectSocket(to: address) {
// We aren't connected, we'll get the remote address later.
self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false)
Expand All @@ -917,7 +916,10 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
becomeActive0(promise: promise)
}
} catch let error {
promise?.fail(error: error)
assert(self.lifecycleManager.isRegistered && !self.lifecycleManager.isActive)
// We're going to set the promise as the pending connect promise, and let close0 fail it for us.
self.pendingConnect = promise
self.close0(error: error, mode: .all, promise: nil)
}
}

Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/ChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ extension ChannelTests {
("testAppropriateAndInappropriateOperationsForUnregisteredSockets", testAppropriateAndInappropriateOperationsForUnregisteredSockets),
("testCloseSocketWhenReadErrorWasReceivedAndMakeSureNoReadCompleteArrives", testCloseSocketWhenReadErrorWasReceivedAndMakeSureNoReadCompleteArrives),
("testSocketFailingAsyncCorrectlyTearsTheChannelDownAndDoesntCrash", testSocketFailingAsyncCorrectlyTearsTheChannelDownAndDoesntCrash),
("testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown", testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown),
]
}
}
Expand Down
107 changes: 78 additions & 29 deletions Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2142,45 +2142,49 @@ public class ChannelTests: XCTestCase {
}
}

class VerifyThingsAreRightHandler: ChannelInboundHandler {
typealias InboundIn = Never
private let allDone: EventLoopPromise<Void>
enum State {
case fresh
case registered
case unregistered
}
private var state: State = .fresh

init(allDone: EventLoopPromise<Void>) {
self.allDone = allDone
}
deinit { XCTAssertEqual(.unregistered, self.state) }

func channelActive(ctx: ChannelHandlerContext) { XCTFail("should never become active") }

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { XCTFail("should never read") }
let group = MultiThreadedEventLoopGroup(numThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let sc = try SocketChannel(socket: SocketFailingAsyncConnect(), eventLoop: group.next() as! SelectableEventLoop)

func channelReadComplete(ctx: ChannelHandlerContext) { XCTFail("should never readComplete") }
let serverChannel = try ServerBootstrap(group: group.next())
.bind(host: "127.0.0.1", port: 0)
.wait()
defer {
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}

func errorCaught(ctx: ChannelHandlerContext, error: Error) { XCTFail("pipeline shouldn't be told about connect error") }
let allDone: EventLoopPromise<Void> = group.next().newPromise()
XCTAssertNoThrow(try sc.eventLoop.submit {
sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
sc.connect(to: serverChannel.localAddress!)
}
}
}.wait())
XCTAssertNoThrow(try allDone.futureResult.wait())
XCTAssertNoThrow(try sc.syncCloseAcceptingAlreadyClosed())
}

func channelRegistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.fresh, self.state)
self.state = .registered
func testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown() throws {
// regression test for #322
enum DummyError: Error { case dummy }
class SocketFailingConnect: Socket {
init() throws {
try super.init(protocolFamily: PF_INET, type: Posix.SOCK_STREAM, setNonBlocking: true)
}

func channelUnregistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.registered, self.state)
self.state = .unregistered
self.allDone.succeed(result: ())
override func connect(to address: SocketAddress) throws -> Bool {
throw DummyError.dummy
}
}

let group = MultiThreadedEventLoopGroup(numThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let sc = try SocketChannel(socket: SocketFailingAsyncConnect(), eventLoop: group.next() as! SelectableEventLoop)
let sc = try SocketChannel(socket: SocketFailingConnect(), eventLoop: group.next() as! SelectableEventLoop)

let serverChannel = try ServerBootstrap(group: group.next())
.bind(host: "127.0.0.1", port: 0)
Expand All @@ -2191,13 +2195,58 @@ public class ChannelTests: XCTestCase {

let allDone: EventLoopPromise<Void> = group.next().newPromise()
XCTAssertNoThrow(try sc.eventLoop.submit {
sc.pipeline.add(handler: VerifyThingsAreRightHandler(allDone: allDone)).then {
let f = sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
sc.connect(to: serverChannel.localAddress!)
}
}
f.whenSuccess {
XCTFail("Must not succeed")
}
f.whenFailure { err in
XCTAssertEqual(err as? DummyError, .dummy)
}
// We can block here because connect must have failed synchronously.
XCTAssertTrue(f.isFulfilled)
}.wait())
XCTAssertNoThrow(try allDone.futureResult.wait())

XCTAssertNoThrow(try sc.closeFuture.wait())
XCTAssertNoThrow(try sc.syncCloseAcceptingAlreadyClosed())
}
}

fileprivate class VerifyConnectionFailureHandler: ChannelInboundHandler {
typealias InboundIn = Never
private let allDone: EventLoopPromise<Void>
enum State {
case fresh
case registered
case unregistered
}
private var state: State = .fresh

init(allDone: EventLoopPromise<Void>) {
self.allDone = allDone
}
deinit { XCTAssertEqual(.unregistered, self.state) }

func channelActive(ctx: ChannelHandlerContext) { XCTFail("should never become active") }

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { XCTFail("should never read") }

func channelReadComplete(ctx: ChannelHandlerContext) { XCTFail("should never readComplete") }

func errorCaught(ctx: ChannelHandlerContext, error: Error) { XCTFail("pipeline shouldn't be told about connect error") }

func channelRegistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.fresh, self.state)
self.state = .registered
}

func channelUnregistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.registered, self.state)
self.state = .unregistered
self.allDone.succeed(result: ())
}
}
8 changes: 6 additions & 2 deletions Tests/NIOTests/SocketChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,14 @@ public class SocketChannelTest : XCTestCase {
let serverChannel = try ServerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait()
do {
try serverChannel.connect(to: serverChannel.localAddress!).wait()
XCTFail("Did not throw")
XCTAssertNoThrow(try serverChannel.close().wait())
} catch let err as ChannelError where err == .operationUnsupported {
// expected
// expected, no close here as the channel is already closed.
} catch {
XCTFail("Unexpected error \(error)")
XCTAssertNoThrow(try serverChannel.close().wait())
}
try serverChannel.close().wait()
}

public func testCloseDuringWriteFailure() throws {
Expand Down

0 comments on commit 57e9460

Please sign in to comment.