Skip to content

Commit

Permalink
Ensure correct ordering of promise notification when connect is still…
Browse files Browse the repository at this point in the history
… pending.

Motivation:

When a pending connect is still in process and the Channel is closed we need to ensure we use the correct ordering when notify the promises and ChannelHandlers in the pipeline.

The ordering should be:

- Notify pending connect promise
- Notify close promise
- Call channelInactive(...)

Modifications:

- Correct ordering of notification
- Add unit test

Result:

Ensure correct ordering when connect is still pending
  • Loading branch information
normanmaurer committed Apr 18, 2018
1 parent c3706cd commit abc35d2
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
// Transition our internal state.
let callouts = self.lifecycleManager.close(promise: p)

if let connectPromise = self.pendingConnect {
self.pendingConnect = nil
connectPromise.fail(error: error)
}

// Now that our state is sensible, we can call out to user code.
self.cancelWritesOnClose(error: error)
callouts(self.pipeline)
Expand All @@ -687,11 +692,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

self.closePromise.succeed(result: ())
}

if let connectPromise = pendingConnect {
pendingConnect = nil
connectPromise.fail(error: error)
}
}


Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/SocketChannelTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extension SocketChannelTest {
("testWriteAndFlushServerSocketChannel", testWriteAndFlushServerSocketChannel),
("testConnectServerSocketChannel", testConnectServerSocketChannel),
("testCloseDuringWriteFailure", testCloseDuringWriteFailure),
("testPendingConnectNotificationOrder", testPendingConnectNotificationOrder),
]
}
}
Expand Down
86 changes: 86 additions & 0 deletions Tests/NIOTests/SocketChannelTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,90 @@ public class SocketChannelTest : XCTestCase {
XCTFail("Unexpected error \(error)")
}
}

public func testPendingConnectNotificationOrder() throws {

class NotificationOrderHandler: ChannelDuplexHandler {
typealias InboundIn = Never
typealias OutboundIn = Never

private var connectPromise: EventLoopPromise<Void>?

public func channelInactive(ctx: ChannelHandlerContext) {
if let connectPromise = self.connectPromise {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
} else {
XCTFail("connect(...) not called before")
}
}

public func connect(ctx: ChannelHandlerContext, to address: SocketAddress, promise: EventLoopPromise<Void>?) {
XCTAssertNil(self.connectPromise)
self.connectPromise = promise
ctx.connect(to: address, promise: promise)
}

func handlerAdded(ctx: ChannelHandlerContext) {
XCTAssertNil(self.connectPromise)
}

func handlerRemoved(ctx: ChannelHandlerContext) {
if let connectPromise = self.connectPromise {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
} else {
XCTFail("connect(...) not called before")
}
}
}

let group = MultiThreadedEventLoopGroup(numThreads: 1)
defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) }

let eventLoop = group.next()
let promise: EventLoopPromise<Void> = eventLoop.newPromise()

class ConnectPendingSocket: Socket {
let promise: EventLoopPromise<Void>
init(promise: EventLoopPromise<Void>) throws {
self.promise = promise
try super.init(protocolFamily: PF_INET, type: Posix.SOCK_STREAM)
}

override func connect(to address: SocketAddress) throws -> Bool {
self.promise.succeed(result: ())
return false
}
}

let channel = try SocketChannel(socket: ConnectPendingSocket(promise: promise), parent: nil, eventLoop: eventLoop as! SelectableEventLoop)

XCTAssertNoThrow(try channel.pipeline.add(handler: NotificationOrderHandler()).then {
channel.register()
}.wait())

let connectPromise: EventLoopPromise<Void> = channel.eventLoop.newPromise()
let closePromise: EventLoopPromise<Void> = channel.eventLoop.newPromise()

channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 9999), promise: connectPromise)

XCTAssertFalse(connectPromise.futureResult.isFulfilled)

closePromise.futureResult.whenComplete {
XCTAssertTrue(connectPromise.futureResult.isFulfilled)
}
connectPromise.futureResult.whenComplete {
XCTAssertFalse(closePromise.futureResult.isFulfilled)
}
channel.close(promise: closePromise)

do {
try connectPromise.futureResult.wait()
XCTFail("Did not throw")
} catch let err as ChannelError where err == .alreadyClosed {
// expected
}
XCTAssertNoThrow(try closePromise.futureResult.wait())
XCTAssertNoThrow(try channel.closeFuture.wait())
XCTAssertNoThrow(try promise.futureResult.wait())
}
}

0 comments on commit abc35d2

Please sign in to comment.