From 06a2cb70d5e868c7079a778b86c9e84a46b5b765 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Wed, 25 Nov 2020 11:37:06 +0000 Subject: [PATCH] Improve bulk stream teardown. Motivation: The logic for doing bulk stream teardown in the ConnectionStreamsState was always a bit janky: we'd grab all the stream IDs out of the map, filter out anything that was too small, insert them one-by-one into a circular buffer, and then report back. I basically just translated that directly into StreamMap, but on StreamMap this potentially performs very poorly, as attempting to find the pivot point becomes a linear scan through the stream data even when there are _loads_ of streams. This patch swaps over to add a specialized function for bulk-removal of streams. The specialized function passes the slice of data to be removed to the caller so that they can compute on it. It does this by a closure to try to avoid a CoW where possible. While we're here I tried to address the loop over the CircularBuffer of recently reset streams. This flushed out an awkward bug that has existed in our recently reset streams logic for a very long time, which is that while we _tried_ to maintain the size of the circular buffer, in practice we misunderstood the CircularBuffer invariants and so this never worked. I've added a bunch of unit tests for that logic and fixed it up. These two changes together will improve performance in stream bulk teardown. This is an uncommon case (it only happens on GOAWAY), but it's still good to make that faster. Modifications: - New operation on StreamMap - New prependWithoutExpanding(contentsOf:) operation - Unit tests for the circular buffer and stream map logic. Result: Better performance on bulk stream teardown. --- .../ConnectionStreamsState.swift | 54 +++++-- Sources/NIOHTTP2/StreamMap.swift | 67 +++++++++ Tests/LinuxMain.swift | 1 + ...CircularBufferExtensionsTests+XCTest.swift | 44 ++++++ .../CircularBufferExtensionsTests.swift | 140 ++++++++++++++++++ .../NIOHTTP2Tests/StreamMapTests+XCTest.swift | 2 + Tests/NIOHTTP2Tests/StreamMapTests.swift | 71 +++++++++ docker/docker-compose.1604.51.yaml | 4 +- docker/docker-compose.1804.52.yaml | 4 +- docker/docker-compose.1804.53.yaml | 4 +- 10 files changed, 372 insertions(+), 19 deletions(-) create mode 100644 Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift create mode 100644 Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift diff --git a/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift b/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift index cb6f29d7..8ddd4f3d 100644 --- a/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift +++ b/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift @@ -256,27 +256,27 @@ struct ConnectionStreamState { mutating func dropAllStreamsWithIDHigherThan(_ streamID: HTTP2StreamID, droppedLocally: Bool, initiatedBy initiator: HTTP2ConnectionStateMachine.ConnectionRole) -> [HTTP2StreamID]? { - let idsToDrop = self.activeStreams.elements(initiatedBy: initiator).drop(while: {$0.streamID <= streamID }).map { $0.streamID } - guard idsToDrop.count > 0 else { - return nil + var droppedIDs: [HTTP2StreamID] = [] + self.activeStreams.dropDataWithStreamIDGreaterThan(streamID, initiatedBy: initiator) { data in + droppedIDs = data.map { $0.streamID } } - for closingStreamID in idsToDrop { - self.activeStreams.removeValue(forStreamID: closingStreamID) + guard droppedIDs.count > 0 else { + return nil + } - if droppedLocally { - self.recentlyResetStreams.prependWithoutExpanding(closingStreamID) - } + if droppedLocally { + self.recentlyResetStreams.prependWithoutExpanding(contentsOf: droppedIDs) } switch initiator { case .client: - self.clientStreamCount -= UInt32(idsToDrop.count) + self.clientStreamCount -= UInt32(droppedIDs.count) case .server: - self.serverStreamCount -= UInt32(idsToDrop.count) + self.serverStreamCount -= UInt32(droppedIDs.count) } - return idsToDrop + return droppedIDs } /// Determines the state machine result to generate when we've been asked to modify a missing stream. @@ -317,15 +317,43 @@ struct ConnectionStreamState { } -private extension CircularBuffer { +extension CircularBuffer { + // CircularBuffer may never be "full": that is, capacity may never equal count. + var effectiveCapacity: Int { + return self.capacity - 1 + } + /// Prepends `element` without expanding the capacity, by dropping the /// element at the end if necessary. mutating func prependWithoutExpanding(_ element: Element) { - if self.capacity == self.count { + if self.effectiveCapacity == self.count { self.removeLast() } self.prepend(element) } + + // NOTE: this could be generic over RandomAccessCollection if we wanted, I'm just saving code size by defining + // it specifically for now. + mutating func prependWithoutExpanding(contentsOf newElements: [Element]) { + // We're going to need to insert these new elements _backwards_, as though they were inserted + // one at a time. + var newElements = newElements.reversed()[...] + let newElementCount = newElements.count + let freeSpace = self.effectiveCapacity - self.count + + if newElementCount >= self.effectiveCapacity { + // We need to completely replace the storage, and then only insert `self.effectiveCapacity` elements. + self.removeAll(keepingCapacity: true) + newElements = newElements.prefix(self.effectiveCapacity) + } else if newElementCount > freeSpace { + // We need to free up enough space to store everything we need, but some of the old elements will remain. + let elementsToRemove = newElementCount - freeSpace + self.removeLast(elementsToRemove) + } + + assert(newElements.count <= self.effectiveCapacity - self.count) + self.insert(contentsOf: newElements, at: self.startIndex) + } } diff --git a/Sources/NIOHTTP2/StreamMap.swift b/Sources/NIOHTTP2/StreamMap.swift index 51f67659..02ad1c1c 100644 --- a/Sources/NIOHTTP2/StreamMap.swift +++ b/Sources/NIOHTTP2/StreamMap.swift @@ -130,6 +130,29 @@ struct StreamMap { return self.serverInitiated.makeIterator() } } + + /// This is a special case helper for the ConnectionStreamsState, which has to handle GOAWAY. In that case we + /// drop all stream state for a bunch of streams all at once. These streams are guaranteed to be sequential and based at a certain stream ID. + /// + /// It's useful to get that data back too, and helpfully CircularBuffer will let us slice it out. We can't return it though: that will cause the mutation + /// to trigger a CoW. Instead we pass it to the caller in a block, and then do the removal. + /// + /// This helper can turn a complex operation that involves repeated resizing of the base objects into a much faster one that also avoids + /// allocation. + mutating func dropDataWithStreamIDGreaterThan(_ streamID: HTTP2StreamID, + initiatedBy role: HTTP2ConnectionStateMachine.ConnectionRole, + _ block: (CircularBuffer.SubSequence) -> Void) { + switch role { + case .client: + let index = self.clientInitiated.findIndexForFirstStreamIDLargerThan(streamID) + block(self.clientInitiated[index...]) + self.clientInitiated.removeSubrange(index...) + case .server: + let index = self.serverInitiated.findIndexForFirstStreamIDLargerThan(streamID) + block(self.serverInitiated[index...]) + self.serverInitiated.removeSubrange(index...) + } + } } internal extension StreamMap where Element == HTTP2StreamStateMachine { @@ -220,6 +243,50 @@ extension CircularBuffer where Element: PerStreamData { return nil } + + fileprivate func findIndexForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + if self.count < binarySearchThreshold { + return self.linearScanForFirstStreamIDLargerThan(streamID) + } else { + return self.binarySearchForFirstStreamIDLargerThan(streamID) + } + } + + private func linearScanForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + var index = self.startIndex + + while index < self.endIndex { + let currentStreamID = self[index].streamID + if currentStreamID > streamID { + return index + } + self.formIndex(after: &index) + } + + return self.endIndex + } + + // Binary search is somewhat complex code compared to a linear scan, so we don't want to inline this code if we can avoid it. + @inline(never) + private func binarySearchForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + var left = self.startIndex + var right = self.endIndex + + while left != right { + let pivot = self.index(left, offsetBy: self.distance(from: left, to: right) / 2) + let currentStreamID = self[pivot].streamID + if currentStreamID > streamID { + right = pivot + } else if currentStreamID == streamID { + // The stream ID is the one after. + return self.index(after: pivot) + } else { + left = self.index(after: pivot) + } + } + + return left + } } extension CircularBuffer where Element == HTTP2StreamStateMachine { diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 55b4b708..3ed32c40 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -35,6 +35,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner { @available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings") func run() { XCTMain([ + testCase(CircularBufferExtensionsTests.allTests), testCase(CompoundOutboundBufferTest.allTests), testCase(ConcurrentStreamBufferTests.allTests), testCase(ConfiguringPipelineTests.allTests), diff --git a/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift new file mode 100644 index 00000000..c6ac70e0 --- /dev/null +++ b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift @@ -0,0 +1,44 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// CircularBufferExtensionsTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension CircularBufferExtensionsTests { + + @available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings") + static var allTests : [(String, (CircularBufferExtensionsTests) -> () throws -> Void)] { + return [ + ("testPrependFromEmpty", testPrependFromEmpty), + ("testPrependWithSpace", testPrependWithSpace), + ("testPrependWithExactlyEnoughSpace", testPrependWithExactlyEnoughSpace), + ("testPrependWithoutEnoughSpace", testPrependWithoutEnoughSpace), + ("testPrependContentsOfWithLotsOfSpace", testPrependContentsOfWithLotsOfSpace), + ("testPrependContentsOfWithExactlyTheSpace", testPrependContentsOfWithExactlyTheSpace), + ("testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement", testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement), + ("testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything", testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything), + ("testPrependContentsOfWithoutEnoughSpaceButContainingElements", testPrependContentsOfWithoutEnoughSpaceButContainingElements), + ("testPrependContentsOfWithExactlyTheSpaceFromEmpty", testPrependContentsOfWithExactlyTheSpaceFromEmpty), + ("testPrependContentsOfWithoutEnoughSpaceFromEmpty", testPrependContentsOfWithoutEnoughSpaceFromEmpty), + ] + } +} + diff --git a/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift new file mode 100644 index 00000000..da03065d --- /dev/null +++ b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift @@ -0,0 +1,140 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import XCTest +import NIO +@testable import NIOHTTP2 + +final class CircularBufferExtensionsTests: XCTestCase { + func testPrependFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(1) + XCTAssertEqual(Array(buffer), [1]) + XCTAssertEqual(buffer.count, 1) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + buffer.prependWithoutExpanding(4) + XCTAssertEqual(Array(buffer), [4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 4) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithExactlyEnoughSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: 1..<15) + buffer.prependWithoutExpanding(15) + XCTAssertEqual(Array(buffer), [15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithoutEnoughSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: 1..<16) + XCTAssertEqual(Array(buffer), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + + buffer.prependWithoutExpanding(17) + XCTAssertEqual(Array(buffer), [17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithLotsOfSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: [4, 5, 6]) + XCTAssertEqual(Array(buffer), [6, 5, 4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 6) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithExactlyTheSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<16)) + XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<17)) + XCTAssertEqual(Array(buffer), [16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<19)) + XCTAssertEqual(Array(buffer), [18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithoutEnoughSpaceButContainingElements() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<32)) + XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithExactlyTheSpaceFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(contentsOf: Array(1..<16)) + XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithoutEnoughSpaceFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(contentsOf: Array(1..<32)) + XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } +} diff --git a/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift b/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift index ea879c0c..82beb236 100644 --- a/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift @@ -34,6 +34,8 @@ extension StreamMapTests { ("testCanFindElements", testCanFindElements), ("testRemoval", testRemoval), ("testModifySpecificValue", testModifySpecificValue), + ("testDroppingAllStreamIDsGreaterThanLinear", testDroppingAllStreamIDsGreaterThanLinear), + ("testDroppingAllStreamIDsGreaterThanBinarySearch", testDroppingAllStreamIDsGreaterThanBinarySearch), ] } } diff --git a/Tests/NIOHTTP2Tests/StreamMapTests.swift b/Tests/NIOHTTP2Tests/StreamMapTests.swift index 8a076fda..547d5727 100644 --- a/Tests/NIOHTTP2Tests/StreamMapTests.swift +++ b/Tests/NIOHTTP2Tests/StreamMapTests.swift @@ -169,6 +169,77 @@ final class StreamMapTests: XCTestCase { XCTAssertNil(map.modify(streamID: 101) { _ in XCTFail("must not execute") }) XCTAssertNil(map.modify(streamID: 102) { _ in XCTFail("must not execute") }) } + + func testDroppingAllStreamIDsGreaterThanLinear() throws { + for streamIDToDropFrom in HTTP2StreamID(1)..() + + for id in 1..<100 { + map.insert(StreamData(id)) + } + + for id in 1..<100 { + XCTAssertTrue(map.contains(streamID: HTTP2StreamID(id))) + } + + map.dropDataWithStreamIDGreaterThan(streamIDToDropFrom, initiatedBy: .client) { droppedStreams in + XCTAssertEqual(droppedStreams.map { $0.streamID }, (streamIDToDropFrom.advanced(by: 1)..() + + for id in 1..<500 { + map.insert(StreamData(id)) + } + + // If we do every stream ID this test takes way too damn long. Skip a bunch by iterating in 55s (odd numbers are good for ensuring we hit things that exist in both sides). + for streamIDToDropFrom in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(500), by: 55) { + var map = map + + for id in 1..<500 { + XCTAssertTrue(map.contains(streamID: HTTP2StreamID(id))) + } + + map.dropDataWithStreamIDGreaterThan(streamIDToDropFrom, initiatedBy: .client) { droppedStreams in + XCTAssertEqual(droppedStreams.map { $0.streamID }, (streamIDToDropFrom.advanced(by: 1)..