Skip to content

Commit

Permalink
eagerly process input EOF/connection resets (fixes #277)
Browse files Browse the repository at this point in the history
Motivation:

We had a number of problems:

1. We wanted to lazily process input EOFs and connection resets only
   when the user actually calls `read()`. On Linux however you cannot
   unsubscribe from `EPOLLHUP` so that's not possible.
2. Lazily processing input EOFs/connection resets wastes kernel
   resources and that could potentially lead to a DOS
3. The very low-level `Selector` interpreted the eventing mechanism's
   events quite a lot so the `EventLoop`/`Channel` only ever saw
   `readable` or `writable` without further information what exactly
   happened.
4. We completely ignored `EPOLLHUP` until now which on Unix Domain
   Socket close leads to a 100% CPU spin (issue #277)

Modifications:

- made the `Selector` interface richer, it now sends the following
  events: `readable`, `writable`, `readEOF` (input EOF), `reset`
  (connection reset or some error)
- process input EOFs and connection resets/errors eagerly
- change all tests which relied on using unconnected and unbound sockets
  to user connected/bound ones as `epoll_wait` otherwise would keep
  sending us a stream of `EPOLLHUP`s which would now lead to an eager
  close

Result:

- most importantly: fix issue #277
- waste less kernel resources (by dealing with input EOFs/connection
  resets eagerly)
- bring kqueue/epoll more in line
  • Loading branch information
weissi committed Apr 16, 2018
1 parent 7758140 commit d06a5c1
Show file tree
Hide file tree
Showing 19 changed files with 744 additions and 289 deletions.
14 changes: 12 additions & 2 deletions IntegrationTests/tests_01_http/defines.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ function create_token() {
}

function start_server() {
local extra_args=''
if [[ "$1" == "--disable-half-closure" ]]; then
extra_args="$1"
shift
fi
local token="$1"
local type="--uds"
local port="$tmp/port.sock"
Expand All @@ -41,7 +46,7 @@ function start_server() {

mkdir "$tmp/htdocs"
swift build
"$(swift build --show-bin-path)/NIOHTTP1Server" $maybe_nio_host "$port" "$tmp/htdocs" &
"$(swift build --show-bin-path)/NIOHTTP1Server" $extra_args $maybe_nio_host "$port" "$tmp/htdocs" &
tmp_server_pid=$!
if [[ -z "$type" ]]; then
# TCP mode, need to wait until we found a port that we can curl
Expand All @@ -68,7 +73,7 @@ function start_server() {
echo "curl port: $curl_port"
echo "local token_port; local token_htdocs; local token_pid;" >> "$token"
echo " token_port='$port'; token_htdocs='$tmp/htdocs'; token_pid='$!';" >> "$token"
echo " token_type='$tok_type';" >> "$token"
echo " token_type='$tok_type'; token_server_ip='$maybe_nio_host'" >> "$token"
tmp_server_pid=$(get_server_pid "$token")
echo "local token_open_fds" >> "$token"
echo "token_open_fds='$(server_lsof "$tmp_server_pid" | wc -l)'" >> "$token"
Expand Down Expand Up @@ -121,6 +126,11 @@ function get_server_port() {
echo "$token_port"
}

function get_server_ip() {
source "$1"
echo "$token_server_ip"
}

function do_curl() {
source "$1"
shift
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token"
server_pid=$(get_server_pid "$token")
socket=$(get_socket "$token")

kill -0 "$server_pid"
echo -e 'GET /dynamic/write-delay/10000 HTTP/1.1\r\n\r\n' | nc -w1 -U "$socket"
sleep 0.2

# note: the way this test would fail is to leak file descriptors (ie. have some
# connections still open 0.2s after the request terminated). `stop_server`
# checks for that, hence there aren't any explicit asserts in here.
stop_server "$token"
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token" tcp
htdocs=$(get_htdocs "$token")
server_pid=$(get_server_pid "$token")
ip=$(get_server_ip "$token")
port=$(get_server_port "$token")

kill -0 $server_pid
echo -e 'GET /dynamic/write-delay/10000 HTTP/1.1\r\n\r\n' | nc -w1 "$ip" "$port"
sleep 0.2
stop_server "$token"
32 changes: 32 additions & 0 deletions IntegrationTests/tests_01_http/test_21_connection_reset_tcp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token" tcp
htdocs=$(get_htdocs "$token")
server_pid=$(get_server_pid "$token")
ip=$(get_server_ip "$token")
port=$(get_server_port "$token")

kill -0 $server_pid
# try to simulate a TCP connection reset, works really well on Darwin but not on
# Linux over loopback. On Linux however
# `test_19_connection_drop_while_waiting_for_response_uds.sh` tests a very
# similar situation.
yes "$( echo -e 'GET /dynamic/write-delay HTTP/1.1\r\n\r\n')" | nc "$ip" "$port" > /dev/null & sleep 0.5; kill -9 $!
sleep 0.2
stop_server "$token"
6 changes: 3 additions & 3 deletions Sources/NIO/BaseSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
//
//===----------------------------------------------------------------------===//

/// A Registration on a `Selector`, which is interested in an `IOEvent`.
/// A Registration on a `Selector`, which is interested in an `SelectorEventSet`.
protocol Registration {
/// The `IOEvent` in which the `Registration` is interested.
var interested: IOEvent { get set }
/// The `SelectorEventSet` in which the `Registration` is interested.
var interested: SelectorEventSet { get set }
}

protocol SockAddrProtocol {
Expand Down
128 changes: 89 additions & 39 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
private let isActiveAtomic: Atomic<Bool> = Atomic(value: false)
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads

internal var interestedEvent: IOEvent = .none
internal var interestedEvent: SelectorEventSet = [.readEOF, .reset] {
didSet {
assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset")
}
}

var readPending = false
var pendingConnect: EventLoopPromise<Void>?
Expand Down Expand Up @@ -221,6 +225,19 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
case some
}

/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
private enum ReadStreamState {
/// Everything seems normal.
case normal

/// We saw EOF.
case eof

/// A read error was received.
case error
}

// MARK: Computed Properties
public final var _unsafe: ChannelCore { return self }

Expand Down Expand Up @@ -288,7 +305,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

/// Provides the registration for this selector. Must be implemented by subclasses.
func registrationFor(interested: IOEvent) -> NIORegistration {
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
fatalError("must override")
}

Expand Down Expand Up @@ -545,26 +562,21 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
private func registerForWritable() {
assert(eventLoop.inEventLoop)

switch interestedEvent {
case .read:
safeReregister(interested: .all)
case .none:
safeReregister(interested: .write)
case .write, .all:
break
guard !self.interestedEvent.contains(.write) else {
// nothing to do if we were previously interested in write
return
}
self.safeReregister(interested: self.interestedEvent.union(.write))
}

func unregisterForWritable() {
assert(eventLoop.inEventLoop)
switch interestedEvent {
case .all:
safeReregister(interested: .read)
case .write:
safeReregister(interested: .none)
case .read, .none:
break

guard self.interestedEvent.contains(.write) else {
// nothing to do if we were not previously interested in write
return
}
self.safeReregister(interested: self.interestedEvent.subtracting(.write))
}

public final func flush0() {
Expand Down Expand Up @@ -611,28 +623,22 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

switch interestedEvent {
case .write:
safeReregister(interested: .all)
case .none:
safeReregister(interested: .read)
case .read, .all:
break
guard !self.interestedEvent.contains(.read) else {
return
}

self.safeReregister(interested: self.interestedEvent.union(.read))
}

func unregisterForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

switch interestedEvent {
case .read:
safeReregister(interested: .none)
case .all:
safeReregister(interested: .write)
case .write, .none:
break
guard self.interestedEvent.contains(.read) else {
return
}

self.safeReregister(interested: self.interestedEvent.subtracting(.read))
}

public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
Expand All @@ -648,7 +654,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

interestedEvent = .none
self.interestedEvent = .reset
do {
try selectableEventLoop.deregister(channel: self)
} catch let err {
Expand Down Expand Up @@ -705,7 +711,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
// Was not registered yet so do it now.
do {
// We always register with interested .none and will just trigger readIfNeeded0() later to re-register if needed.
try self.safeRegister(interested: .none)
try self.safeRegister(interested: [.readEOF, .reset])
self.lifecycleManager.register(promise: promise)(self.pipeline)
} catch {
promise?.fail(error: error)
Expand Down Expand Up @@ -761,7 +767,47 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

final func readEOF() {
if self.lifecycleManager.isRegistered {
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
// and would anyway fire constantly.
self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF))

loop: while self.lifecycleManager.isActive {
switch self.readable0() {
case .eof:
// on EOF we stop the loop and we're done with our processing for `readEOF`.
// we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't).
break loop
case .error:
// we should be unregistered and inactive now (as `readable0` would've called close).
assert(!self.lifecycleManager.isActive)
assert(!self.lifecycleManager.isRegistered)
break loop
case .normal:
// normal, note that there is no guarantee we're still active (as the user might have closed in callout)
continue loop
}
}
}
}

// this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In
// other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
// the `reset` event.
final func reset() {
self.readEOF()
self.close0(error: ChannelError.eof, mode: .all, promise: nil)
assert(!self.lifecycleManager.isRegistered)
}

public final func readable() {
self.readable0()
}

@discardableResult
private final func readable0() -> ReadStreamState {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isActive)

Expand All @@ -774,11 +820,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
do {
try readFromSocket()
} catch let err {
let readStreamState: ReadStreamState
// ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
// peer closed / shutdown the connection.
if let channelErr = err as? ChannelError, channelErr == ChannelError.eof {
readStreamState = .eof
// Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future.
if try! getOption0(option: ChannelOptions.allowRemoteHalfClosure) {
if self.lifecycleManager.isActive, try! getOption0(option: ChannelOptions.allowRemoteHalfClosure) {
// If we want to allow half closure we will just mark the input side of the Channel
// as closed.
assert(self.lifecycleManager.isActive)
Expand All @@ -787,9 +835,10 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
close0(error: err, mode: .input, promise: nil)
}
readPending = false
return
return .eof
}
} else {
readStreamState = .error
self.pipeline.fireErrorCaught0(error: err)
}

Expand All @@ -802,12 +851,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.close0(error: err, mode: .all, promise: nil)
}

return
return readStreamState
}
if self.lifecycleManager.isActive {
pipeline.fireChannelReadComplete0()
}
readIfNeeded0()
return .normal
}

/// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
Expand Down Expand Up @@ -881,15 +931,15 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func isWritePending() -> Bool {
return interestedEvent == .write || interestedEvent == .all
return self.interestedEvent.contains(.write)
}

private func safeReregister(interested: IOEvent) {
private final func safeReregister(interested: SelectorEventSet) {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

guard self.isOpen else {
interestedEvent = .none
assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) event though we're closed")
return
}
if interested == interestedEvent {
Expand All @@ -905,7 +955,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

private func safeRegister(interested: IOEvent) throws {
private func safeRegister(interested: SelectorEventSet) throws {
assert(eventLoop.inEventLoop)
assert(!self.lifecycleManager.isRegistered)

Expand Down
Loading

0 comments on commit d06a5c1

Please sign in to comment.