Skip to content

Commit

Permalink
channelActive needs to happen before channelRead
Browse files Browse the repository at this point in the history
Motivation:

`readIfNeeded` was triggered straight after the channel was registered
but _before_ `channelActive` was called which is wrong.

Modifications:

Moved the calls to `readIfNeeded` into `becomeActive0` and removed them
everywhere else (except some error path).

Result:

handle ChannelHandler lifecycle more correctly.
  • Loading branch information
weissi committed Mar 21, 2018
1 parent 9a6f34f commit 4c2ff85
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
8 changes: 3 additions & 5 deletions Sources/NIO/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
neverRegistered = false
promise?.succeed(result: ())
pipeline.fireChannelRegistered0()
readIfNeeded0()
} catch {
promise?.fail(error: error)
}
Expand Down Expand Up @@ -746,7 +745,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

fileprivate func becomeActive0(promise: EventLoopPromise<Void>?) {
fileprivate final func becomeActive0(promise: EventLoopPromise<Void>?) {
assert(eventLoop.inEventLoop)
assert(!self.active.load())
assert(self._isOpen)
Expand All @@ -759,6 +758,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
promise.succeed(result: ())
}
pipeline.fireChannelActive0()
self.readIfNeeded0()
}

fileprivate func becomeInactive0(promise: EventLoopPromise<Void>?) {
Expand Down Expand Up @@ -860,6 +860,7 @@ final class SocketChannel: BaseSocketChannel<Socket> {
}

override fileprivate func readFromSocket() throws -> ReadResult {
assert(self.eventLoop.inEventLoop)
// Just allocate one time for the while read loop. This is fine as ByteBuffer is a struct and uses COW.
var buffer = recvAllocator.buffer(allocator: allocator)
var result = ReadResult.none
Expand Down Expand Up @@ -1104,7 +1105,6 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
p.futureResult.map {
// Its important to call the methods before we actual notify the original promise for ordering reasons.
self.becomeActive0(promise: promise)
self.readIfNeeded0()
}.whenFailure{ error in
promise?.fail(error: error)
}
Expand Down Expand Up @@ -1177,7 +1177,6 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
throw ChannelError.ioOnClosedChannel
}
ch.becomeActive0(promise: nil)
ch.readIfNeeded0()
}.whenFailure { error in
ch.close(promise: nil)
}
Expand Down Expand Up @@ -1389,7 +1388,6 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
try socket.bind(to: address)
self.updateCachedAddressesFromSocket(updateRemote: false)
becomeActive0(promise: promise)
readIfNeeded0()
} catch let err {
promise?.fail(error: err)
}
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/ChannelTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ extension ChannelTests {
("testNoChannelReadIfNoAutoRead", testNoChannelReadIfNoAutoRead),
("testEOFOnlyReceivedOnceReadRequested", testEOFOnlyReceivedOnceReadRequested),
("testAcceptsAfterCloseDontCauseIssues", testAcceptsAfterCloseDontCauseIssues),
("testChannelReadsDoesNotHappenAfterRegistration", testChannelReadsDoesNotHappenAfterRegistration),
]
}
}
Expand Down
88 changes: 88 additions & 0 deletions Tests/NIOTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1716,4 +1716,92 @@ public class ChannelTests: XCTestCase {
XCTFail("unexpected error: \(error)")
}
}

func testChannelReadsDoesNotHappenAfterRegistration() throws {
class ReadDoesNotHappen: ChannelInboundHandler {
typealias InboundIn = Any
private let hasRegisteredPromise: EventLoopPromise<()>
private let hasUnregisteredPromise: EventLoopPromise<()>
init(hasRegisteredPromise: EventLoopPromise<()>, hasUnregisteredPromise: EventLoopPromise<()>) {
self.hasRegisteredPromise = hasRegisteredPromise
self.hasUnregisteredPromise = hasUnregisteredPromise
}
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
XCTFail("read has happened and shouldn't have")
}
func channelActive(ctx: ChannelHandlerContext) {
XCTFail("shouldn't have been activated")
}
func channelRegistered(ctx: ChannelHandlerContext) {
self.hasRegisteredPromise.succeed(result: ())
}
func channelUnregistered(ctx: ChannelHandlerContext) {
self.hasUnregisteredPromise.succeed(result: ())
}
}

let elg = MultiThreadedEventLoopGroup(numThreads: 1)
defer {
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}
let sc = try SocketChannel(eventLoop: elg.next() as! SelectableEventLoop, protocolFamily: PF_INET)

class WriteImmediatelyHandler: ChannelInboundHandler {
typealias InboundIn = Any
typealias OutboundOut = ByteBuffer

private let writeDonePromise: EventLoopPromise<()>

init(writeDonePromise: EventLoopPromise<()>) {
self.writeDonePromise = writeDonePromise
}

func channelActive(ctx: ChannelHandlerContext) {
var buffer = ctx.channel.allocator.buffer(capacity: 4)
buffer.write(string: "foo")
ctx.writeAndFlush(NIOAny(buffer), promise: self.writeDonePromise)
}
}

let serverWriteHappenedPromise: EventLoopPromise<()> = elg.next().newPromise()
let clientHasRegistered: EventLoopPromise<()> = elg.next().newPromise()
let clientHasUnregistered: EventLoopPromise<()> = elg.next().newPromise()

let bootstrap = try! ServerBootstrap(group: elg)
.childChannelInitializer { channel in
channel.pipeline.add(handler: WriteImmediatelyHandler(writeDonePromise: serverWriteHappenedPromise))
}
.bind(host: "127.0.0.1", port: 0).wait()

// This is a bit ugly, we're trying to fabricate a situation that can happen in the real world which is that
// a socket is readable straight after becoming registered & connected.
// In here what we're doing is that we flip the order around and connect it first, make sure the server
// has written something and then on registration something is available to be read. However it should not
// be read yet (and in this case never will be) as we haven't send channelActive yet (which will never happen)
// in this example.
_ = try sc.selectable.connect(to: bootstrap.localAddress!)
try serverWriteHappenedPromise.futureResult.wait()
try sc.pipeline.add(handler: ReadDoesNotHappen(hasRegisteredPromise: clientHasRegistered,
hasUnregisteredPromise: clientHasUnregistered)).then {
// this will succeed and should not cause the socket to be read even though there'll be something
// available to be read immediately
sc.register()
}.then {
// this will fail as the socket is already pre-connected ;)
sc.connect(to: bootstrap.localAddress!).map {
XCTFail("should've failed")
}.mapIfError { error in
switch error {
case let e as IOError where e.errnoCode == EISCONN:
// OK
()
default:
XCTFail("wrong error \(error)")
}
}
}.wait()
try clientHasRegistered.futureResult.wait()
try sc.close().wait()
try clientHasUnregistered.futureResult.wait()
}
}

0 comments on commit 4c2ff85

Please sign in to comment.