Skip to content

Commit

Permalink
don't deliver events for unregistered fds
Browse files Browse the repository at this point in the history
Motivation:

Since forever we had a major bug in the Selector: In this condition:

- kqueue/epoll had many events
- in one of the earlier events we unregister a Channel whose fd is on of
  the later events
- we subsequently (still in the same event loop tick) register a new
  channel which gets the same fd as the previously closed one

then we would deliver an event that was meant for a previous channel to
a newly opened one.

Thanks to @mcdappdev for hitting this bug, helping us debug it and also
providing a repeatedly working repro.

Modifications:

if during event delivery any fd gets unregistered, we stop delivering
the remaining events and rely on the selector to redeliver them
again next time.

Result:

we don't deliver events for previously closed channels to new ones.
  • Loading branch information
weissi committed Apr 20, 2018
1 parent 17a2aae commit 755bc7a
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 8 deletions.
23 changes: 18 additions & 5 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ private struct SocketChannelLifecycleManager {
// this is queried from the Channel, ie. must be thread-safe
internal let isActiveAtomic: Atomic<Bool>
// these are only to be accessed on the EventLoop

// have we seen the `.readEOF` notification
// note: this can be `false` on a deactivated channel, we might just have torn it down.
var hasSeenEOFNotification: Bool = false

private var currentState: State = .fresh {
didSet {
assert(self.eventLoop.inEventLoop)
Expand Down Expand Up @@ -228,8 +233,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
private enum ReadStreamState {
/// Everything seems normal.
case normal
/// Everything seems normal
case normal(ReadResult)

/// We saw EOF.
case eof
Expand Down Expand Up @@ -776,6 +781,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

final func readEOF() {
print("readEOF \(self)")
assert(!self.lifecycleManager.hasSeenEOFNotification)
self.lifecycleManager.hasSeenEOFNotification = true
if self.lifecycleManager.isRegistered {
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
Expand All @@ -793,7 +801,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(!self.lifecycleManager.isActive)
assert(!self.lifecycleManager.isRegistered)
break loop
case .normal:
case .normal(.none):
preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
case .normal(.some):
// normal, note that there is no guarantee we're still active (as the user might have closed in callout)
continue loop
}
Expand Down Expand Up @@ -831,6 +841,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func readable() {
assert(!self.lifecycleManager.hasSeenEOFNotification,
"got a read notification after having already seen .readEOF")
self.readable0()
}

Expand All @@ -845,8 +857,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

let readResult: ReadResult
do {
try readFromSocket()
readResult = try readFromSocket()
} catch let err {
let readStreamState: ReadStreamState
// ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
Expand Down Expand Up @@ -885,7 +898,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
pipeline.fireChannelReadComplete0()
}
readIfNeeded0()
return .normal
return .normal(readResult)
}

/// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
Expand Down
8 changes: 6 additions & 2 deletions Sources/NIO/Selector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ final class Selector<R: Registration> {
private var eventsCapacity = 64
private var events: UnsafeMutablePointer<EventType>
private var registrations = [Int: R]()
private var deregistrationsHappened: Bool = false

private static func allocateEventsArray(capacity: Int) -> UnsafeMutablePointer<EventType> {
let events: UnsafeMutablePointer<EventType> = UnsafeMutablePointer.allocate(capacity: capacity)
Expand Down Expand Up @@ -453,6 +454,7 @@ final class Selector<R: Registration> {
guard self.lifecycleState == .open else {
throw IOError(errnoCode: EBADF, reason: "can't deregister from selector as it's \(self.lifecycleState).")
}
self.deregistrationsHappened = true
try selectable.withUnsafeFileDescriptor { fd in
guard let reg = registrations.removeValue(forKey: Int(fd)) else {
return
Expand Down Expand Up @@ -500,7 +502,8 @@ final class Selector<R: Registration> {
ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: -1))
}

for i in 0..<ready {
self.deregistrationsHappened = false
for i in 0..<ready where !self.deregistrationsHappened {
let ev = events[i]
switch ev.data.fd {
case eventfd:
Expand Down Expand Up @@ -540,7 +543,8 @@ final class Selector<R: Registration> {
Int(try KQueue.kevent(kq: self.fd, changelist: nil, nchanges: 0, eventlist: events, nevents: Int32(eventsCapacity), timeout: ts))
}

for i in 0..<ready {
self.deregistrationsHappened = false
for i in 0..<ready where !self.deregistrationsHappened {
let ev = events[i]
let filter = Int32(ev.filter)
guard Int32(ev.flags) & EV_ERROR == 0 else {
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/SelectorTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extension SelectorTest {
return [
("testDeregisterWhileProcessingEvents", testDeregisterWhileProcessingEvents),
("testDeregisterAndCloseWhileProcessingEvents", testDeregisterAndCloseWhileProcessingEvents),
("testWeDoNotDeliverEventsForPreviouslyClosedChannels", testWeDoNotDeliverEventsForPreviouslyClosedChannels),
]
}
}
Expand Down
Loading

0 comments on commit 755bc7a

Please sign in to comment.