Skip to content

Commit

Permalink
Ensure correct ordering of promise notification when connect is still…
Browse files Browse the repository at this point in the history
… pending. (#330)

Motivation:

When a pending connect is still in process and the Channel is closed we need to ensure we use the correct ordering when notify the promises and ChannelHandlers in the pipeline.

The ordering should be:

- Notify pending connect promise
- Notify close promise
- Call channelInactive(...)

Modifications:

- Correct ordering of notification
- Add unit test

Result:

Ensure correct ordering when connect is still pending
  • Loading branch information
normanmaurer authored Apr 19, 2018
1 parent 338cfb5 commit 91ef938
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
// Transition our internal state.
let callouts = self.lifecycleManager.close(promise: p)

if let connectPromise = self.pendingConnect {
self.pendingConnect = nil
connectPromise.fail(error: error)
}

// Now that our state is sensible, we can call out to user code.
self.cancelWritesOnClose(error: error)
callouts(self.pipeline)
Expand All @@ -687,11 +692,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

self.closePromise.succeed(result: ())
}

if let connectPromise = pendingConnect {
pendingConnect = nil
connectPromise.fail(error: error)
}
}


Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/SocketChannelTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ extension SocketChannelTest {
("testCloseDuringWriteFailure", testCloseDuringWriteFailure),
("testWithConfiguredStreamSocket", testWithConfiguredStreamSocket),
("testWithConfiguredDatagramSocket", testWithConfiguredDatagramSocket),
("testPendingConnectNotificationOrder", testPendingConnectNotificationOrder),
]
}
}
Expand Down
96 changes: 96 additions & 0 deletions Tests/NIOTests/SocketChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,100 @@ public class SocketChannelTest : XCTestCase {

try serverChannel.close().wait()
}

public func testPendingConnectNotificationOrder() throws {

class NotificationOrderHandler: ChannelDuplexHandler {
typealias InboundIn = Never
typealias OutboundIn = Never

private var connectPromise: EventLoopPromise<Void>?

public func channelInactive(ctx: ChannelHandlerContext) {
if let connectPromise = self.connectPromise {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
} else {
XCTFail("connect(...) not called before")
}
}

public func connect(ctx: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) {
XCTAssertNil(self.connectPromise)
self.connectPromise = promise
ctx.connect(to: address, promise: promise)
}

func handlerAdded(ctx: ChannelHandlerContext) {
XCTAssertNil(self.connectPromise)
}

func handlerRemoved(ctx: ChannelHandlerContext) {
if let connectPromise = self.connectPromise {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
} else {
XCTFail("connect(...) not called before")
}
}
}

let group = MultiThreadedEventLoopGroup(numThreads: 1)
defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) }

let serverChannel = try ServerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait()
defer { XCTAssertNoThrow(try serverChannel.close().wait()) }

let eventLoop = group.next()
let promise: EventLoopPromise<Void> = eventLoop.newPromise()

class ConnectPendingSocket: Socket {
let promise: EventLoopPromise<Void>
init(promise: EventLoopPromise<Void>) throws {
self.promise = promise
try super.init(protocolFamily: PF_INET, type: Posix.SOCK_STREAM)
}

override func connect(to address: SocketAddress) throws -> Bool {
// We want to return false here to have a pending connect.
_ = try super.connect(to: address)
self.promise.succeed(result: ())
return false
}
}

let channel = try SocketChannel(socket: ConnectPendingSocket(promise: promise), parent: nil, eventLoop: eventLoop as! SelectableEventLoop)
let connectPromise: EventLoopPromise<Void> = channel.eventLoop.newPromise()
let closePromise: EventLoopPromise<Void> = channel.eventLoop.newPromise()

closePromise.futureResult.whenComplete {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
}
connectPromise.futureResult.whenComplete {
XCTAssertFalse(closePromise.futureResult.isFulfilled)
}

XCTAssertNoThrow(try channel.pipeline.add(handler: NotificationOrderHandler()).wait())

// We need to call submit {...} here to ensure then {...} is called while on the EventLoop already to not have
// a ECONNRESET sneak in.
XCTAssertNoThrow(try channel.eventLoop.submit {
channel.register().map { () -> Void in
channel.connect(to: serverChannel.localAddress!, promise: connectPromise)
}.map { () -> Void in
XCTAssertFalse(connectPromise.futureResult.isFulfilled)
// The close needs to happen in the then { ... } block to ensure we close the channel
// before we have the chance to register it for .write.
channel.close(promise: closePromise)
}
}.wait().wait() as Void)

do {
try connectPromise.futureResult.wait()
XCTFail("Did not throw")
} catch let err as ChannelError where err == .alreadyClosed {
// expected
}
XCTAssertNoThrow(try closePromise.futureResult.wait())
XCTAssertNoThrow(try channel.closeFuture.wait())
XCTAssertNoThrow(try promise.futureResult.wait())
}
}

0 comments on commit 91ef938

Please sign in to comment.