Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eagerly process input EOF/connection resets (fixes #277) #286

Merged
merged 1 commit into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions IntegrationTests/tests_01_http/defines.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ function create_token() {
}

function start_server() {
local extra_args=''
if [[ "$1" == "--disable-half-closure" ]]; then
extra_args="$1"
shift
fi
local token="$1"
local type="--uds"
local port="$tmp/port.sock"
Expand All @@ -41,7 +46,7 @@ function start_server() {

mkdir "$tmp/htdocs"
swift build
"$(swift build --show-bin-path)/NIOHTTP1Server" $maybe_nio_host "$port" "$tmp/htdocs" &
"$(swift build --show-bin-path)/NIOHTTP1Server" $extra_args $maybe_nio_host "$port" "$tmp/htdocs" &
tmp_server_pid=$!
if [[ -z "$type" ]]; then
# TCP mode, need to wait until we found a port that we can curl
Expand All @@ -68,7 +73,7 @@ function start_server() {
echo "curl port: $curl_port"
echo "local token_port; local token_htdocs; local token_pid;" >> "$token"
echo " token_port='$port'; token_htdocs='$tmp/htdocs'; token_pid='$!';" >> "$token"
echo " token_type='$tok_type';" >> "$token"
echo " token_type='$tok_type'; token_server_ip='$maybe_nio_host'" >> "$token"
tmp_server_pid=$(get_server_pid "$token")
echo "local token_open_fds" >> "$token"
echo "token_open_fds='$(server_lsof "$tmp_server_pid" | wc -l)'" >> "$token"
Expand Down Expand Up @@ -121,6 +126,11 @@ function get_server_port() {
echo "$token_port"
}

function get_server_ip() {
source "$1"
echo "$token_server_ip"
}

function do_curl() {
source "$1"
shift
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token"
server_pid=$(get_server_pid "$token")
socket=$(get_socket "$token")

kill -0 "$server_pid"
echo -e 'GET /dynamic/write-delay/10000 HTTP/1.1\r\n\r\n' | nc -w1 -U "$socket"
sleep 0.2

# note: the way this test would fail is to leak file descriptors (ie. have some
# connections still open 0.2s after the request terminated). `stop_server`
# checks for that, hence there aren't any explicit asserts in here.
stop_server "$token"
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token" tcp
htdocs=$(get_htdocs "$token")
server_pid=$(get_server_pid "$token")
ip=$(get_server_ip "$token")
port=$(get_server_port "$token")

kill -0 $server_pid
echo -e 'GET /dynamic/write-delay/10000 HTTP/1.1\r\n\r\n' | nc -w1 "$ip" "$port"
sleep 0.2
stop_server "$token"
32 changes: 32 additions & 0 deletions IntegrationTests/tests_01_http/test_21_connection_reset_tcp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
##===----------------------------------------------------------------------===##
##
## This source file is part of the SwiftNIO open source project
##
## Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
## Licensed under Apache License v2.0
##
## See LICENSE.txt for license information
## See CONTRIBUTORS.txt for the list of SwiftNIO project authors
##
## SPDX-License-Identifier: Apache-2.0
##
##===----------------------------------------------------------------------===##

source defines.sh

token=$(create_token)
start_server --disable-half-closure "$token" tcp
htdocs=$(get_htdocs "$token")
server_pid=$(get_server_pid "$token")
ip=$(get_server_ip "$token")
port=$(get_server_port "$token")

kill -0 $server_pid
# try to simulate a TCP connection reset, works really well on Darwin but not on
# Linux over loopback. On Linux however
# `test_19_connection_drop_while_waiting_for_response_uds.sh` tests a very
# similar situation.
yes "$( echo -e 'GET /dynamic/write-delay HTTP/1.1\r\n\r\n')" | nc "$ip" "$port" > /dev/null & sleep 0.5; kill -9 $!
sleep 0.2
stop_server "$token"
6 changes: 3 additions & 3 deletions Sources/NIO/BaseSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
//
//===----------------------------------------------------------------------===//

/// A Registration on a `Selector`, which is interested in an `IOEvent`.
/// A Registration on a `Selector`, which is interested in an `SelectorEventSet`.
protocol Registration {
/// The `IOEvent` in which the `Registration` is interested.
var interested: IOEvent { get set }
/// The `SelectorEventSet` in which the `Registration` is interested.
var interested: SelectorEventSet { get set }
}

protocol SockAddrProtocol {
Expand Down
128 changes: 89 additions & 39 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
private let isActiveAtomic: Atomic<Bool> = Atomic(value: false)
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads

internal var interestedEvent: IOEvent = .none
internal var interestedEvent: SelectorEventSet = [.readEOF, .reset] {
didSet {
assert(self.interestedEvent.contains(.reset), "impossible to unregister for reset")
}
}

var readPending = false
var pendingConnect: EventLoopPromise<Void>?
Expand Down Expand Up @@ -221,6 +225,19 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
case some
}

/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
private enum ReadStreamState {
/// Everything seems normal.
case normal

/// We saw EOF.
case eof

/// A read error was received.
case error
}

// MARK: Computed Properties
public final var _unsafe: ChannelCore { return self }

Expand Down Expand Up @@ -288,7 +305,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

/// Provides the registration for this selector. Must be implemented by subclasses.
func registrationFor(interested: IOEvent) -> NIORegistration {
func registrationFor(interested: SelectorEventSet) -> NIORegistration {
fatalError("must override")
}

Expand Down Expand Up @@ -545,26 +562,21 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
private func registerForWritable() {
assert(eventLoop.inEventLoop)

switch interestedEvent {
case .read:
safeReregister(interested: .all)
case .none:
safeReregister(interested: .write)
case .write, .all:
break
guard !self.interestedEvent.contains(.write) else {
// nothing to do if we were previously interested in write
return
}
self.safeReregister(interested: self.interestedEvent.union(.write))
}

func unregisterForWritable() {
assert(eventLoop.inEventLoop)
switch interestedEvent {
case .all:
safeReregister(interested: .read)
case .write:
safeReregister(interested: .none)
case .read, .none:
break

guard self.interestedEvent.contains(.write) else {
// nothing to do if we were not previously interested in write
return
}
self.safeReregister(interested: self.interestedEvent.subtracting(.write))
}

public final func flush0() {
Expand Down Expand Up @@ -611,28 +623,22 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

switch interestedEvent {
case .write:
safeReregister(interested: .all)
case .none:
safeReregister(interested: .read)
case .read, .all:
break
guard !self.interestedEvent.contains(.read) else {
return
}

self.safeReregister(interested: self.interestedEvent.union(.read))
}

func unregisterForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

switch interestedEvent {
case .read:
safeReregister(interested: .none)
case .all:
safeReregister(interested: .write)
case .write, .none:
break
guard self.interestedEvent.contains(.read) else {
return
}

self.safeReregister(interested: self.interestedEvent.subtracting(.read))
}

public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
Expand All @@ -648,7 +654,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

interestedEvent = .none
self.interestedEvent = .reset
do {
try selectableEventLoop.deregister(channel: self)
} catch let err {
Expand Down Expand Up @@ -705,7 +711,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
// Was not registered yet so do it now.
do {
// We always register with interested .none and will just trigger readIfNeeded0() later to re-register if needed.
try self.safeRegister(interested: .none)
try self.safeRegister(interested: [.readEOF, .reset])
self.lifecycleManager.register(promise: promise)(self.pipeline)
} catch {
promise?.fail(error: error)
Expand Down Expand Up @@ -761,7 +767,47 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

final func readEOF() {
if self.lifecycleManager.isRegistered {
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
// and would anyway fire constantly.
self.safeReregister(interested: self.interestedEvent.subtracting(.readEOF))

loop: while self.lifecycleManager.isActive {
switch self.readable0() {
case .eof:
// on EOF we stop the loop and we're done with our processing for `readEOF`.
// we could both be registered & active (if our channel supports half-closure) or unregistered & inactive (if it doesn't).
break loop
case .error:
// we should be unregistered and inactive now (as `readable0` would've called close).
assert(!self.lifecycleManager.isActive)
assert(!self.lifecycleManager.isRegistered)
break loop
case .normal:
// normal, note that there is no guarantee we're still active (as the user might have closed in callout)
continue loop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you could just make this break

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@normanmaurer but then the code would be really hard to read, no? continue loop is really clear, so is break loop. But break meaning 'break the switch & continue the loop' is incredibly hard to spot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly I dont think its harder to read but I dont care to much about if you change it or not... Thats why its a nit ;)

}
}
}
}

// this _needs_ to synchronously cause the fd to be unregistered because we cannot unregister from `reset`. In
// other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
// the `reset` event.
final func reset() {
self.readEOF()
self.close0(error: ChannelError.eof, mode: .all, promise: nil)
assert(!self.lifecycleManager.isRegistered)
}

public final func readable() {
self.readable0()
}

@discardableResult
private final func readable0() -> ReadStreamState {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isActive)

Expand All @@ -774,11 +820,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
do {
try readFromSocket()
} catch let err {
let readStreamState: ReadStreamState
// ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
// peer closed / shutdown the connection.
if let channelErr = err as? ChannelError, channelErr == ChannelError.eof {
readStreamState = .eof
// Directly call getOption0 as we are already on the EventLoop and so not need to create an extra future.
if try! getOption0(option: ChannelOptions.allowRemoteHalfClosure) {
if self.lifecycleManager.isActive, try! getOption0(option: ChannelOptions.allowRemoteHalfClosure) {
// If we want to allow half closure we will just mark the input side of the Channel
// as closed.
assert(self.lifecycleManager.isActive)
Expand All @@ -787,9 +835,10 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
close0(error: err, mode: .input, promise: nil)
}
readPending = false
return
return .eof
}
} else {
readStreamState = .error
self.pipeline.fireErrorCaught0(error: err)
}

Expand All @@ -802,12 +851,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
self.close0(error: err, mode: .all, promise: nil)
}

return
return readStreamState
}
if self.lifecycleManager.isActive {
pipeline.fireChannelReadComplete0()
}
readIfNeeded0()
return .normal
}

/// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
Expand Down Expand Up @@ -881,15 +931,15 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

private func isWritePending() -> Bool {
return interestedEvent == .write || interestedEvent == .all
return self.interestedEvent.contains(.write)
}

private func safeReregister(interested: IOEvent) {
private final func safeReregister(interested: SelectorEventSet) {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

guard self.isOpen else {
interestedEvent = .none
assert(self.interestedEvent == .reset, "interestedEvent=\(self.interestedEvent) event though we're closed")
return
}
if interested == interestedEvent {
Expand All @@ -905,7 +955,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

private func safeRegister(interested: IOEvent) throws {
private func safeRegister(interested: SelectorEventSet) throws {
assert(eventLoop.inEventLoop)
assert(!self.lifecycleManager.isRegistered)

Expand Down
Loading