diff --git a/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift b/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift index cb6f29d7..8ddd4f3d 100644 --- a/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift +++ b/Sources/NIOHTTP2/ConnectionStateMachine/ConnectionStreamsState.swift @@ -256,27 +256,27 @@ struct ConnectionStreamState { mutating func dropAllStreamsWithIDHigherThan(_ streamID: HTTP2StreamID, droppedLocally: Bool, initiatedBy initiator: HTTP2ConnectionStateMachine.ConnectionRole) -> [HTTP2StreamID]? { - let idsToDrop = self.activeStreams.elements(initiatedBy: initiator).drop(while: {$0.streamID <= streamID }).map { $0.streamID } - guard idsToDrop.count > 0 else { - return nil + var droppedIDs: [HTTP2StreamID] = [] + self.activeStreams.dropDataWithStreamIDGreaterThan(streamID, initiatedBy: initiator) { data in + droppedIDs = data.map { $0.streamID } } - for closingStreamID in idsToDrop { - self.activeStreams.removeValue(forStreamID: closingStreamID) + guard droppedIDs.count > 0 else { + return nil + } - if droppedLocally { - self.recentlyResetStreams.prependWithoutExpanding(closingStreamID) - } + if droppedLocally { + self.recentlyResetStreams.prependWithoutExpanding(contentsOf: droppedIDs) } switch initiator { case .client: - self.clientStreamCount -= UInt32(idsToDrop.count) + self.clientStreamCount -= UInt32(droppedIDs.count) case .server: - self.serverStreamCount -= UInt32(idsToDrop.count) + self.serverStreamCount -= UInt32(droppedIDs.count) } - return idsToDrop + return droppedIDs } /// Determines the state machine result to generate when we've been asked to modify a missing stream. @@ -317,15 +317,43 @@ struct ConnectionStreamState { } -private extension CircularBuffer { +extension CircularBuffer { + // CircularBuffer may never be "full": that is, capacity may never equal count. + var effectiveCapacity: Int { + return self.capacity - 1 + } + /// Prepends `element` without expanding the capacity, by dropping the /// element at the end if necessary. mutating func prependWithoutExpanding(_ element: Element) { - if self.capacity == self.count { + if self.effectiveCapacity == self.count { self.removeLast() } self.prepend(element) } + + // NOTE: this could be generic over RandomAccessCollection if we wanted, I'm just saving code size by defining + // it specifically for now. + mutating func prependWithoutExpanding(contentsOf newElements: [Element]) { + // We're going to need to insert these new elements _backwards_, as though they were inserted + // one at a time. + var newElements = newElements.reversed()[...] + let newElementCount = newElements.count + let freeSpace = self.effectiveCapacity - self.count + + if newElementCount >= self.effectiveCapacity { + // We need to completely replace the storage, and then only insert `self.effectiveCapacity` elements. + self.removeAll(keepingCapacity: true) + newElements = newElements.prefix(self.effectiveCapacity) + } else if newElementCount > freeSpace { + // We need to free up enough space to store everything we need, but some of the old elements will remain. + let elementsToRemove = newElementCount - freeSpace + self.removeLast(elementsToRemove) + } + + assert(newElements.count <= self.effectiveCapacity - self.count) + self.insert(contentsOf: newElements, at: self.startIndex) + } } diff --git a/Sources/NIOHTTP2/StreamMap.swift b/Sources/NIOHTTP2/StreamMap.swift index 51f67659..02ad1c1c 100644 --- a/Sources/NIOHTTP2/StreamMap.swift +++ b/Sources/NIOHTTP2/StreamMap.swift @@ -130,6 +130,29 @@ struct StreamMap { return self.serverInitiated.makeIterator() } } + + /// This is a special case helper for the ConnectionStreamsState, which has to handle GOAWAY. In that case we + /// drop all stream state for a bunch of streams all at once. These streams are guaranteed to be sequential and based at a certain stream ID. + /// + /// It's useful to get that data back too, and helpfully CircularBuffer will let us slice it out. We can't return it though: that will cause the mutation + /// to trigger a CoW. Instead we pass it to the caller in a block, and then do the removal. + /// + /// This helper can turn a complex operation that involves repeated resizing of the base objects into a much faster one that also avoids + /// allocation. + mutating func dropDataWithStreamIDGreaterThan(_ streamID: HTTP2StreamID, + initiatedBy role: HTTP2ConnectionStateMachine.ConnectionRole, + _ block: (CircularBuffer.SubSequence) -> Void) { + switch role { + case .client: + let index = self.clientInitiated.findIndexForFirstStreamIDLargerThan(streamID) + block(self.clientInitiated[index...]) + self.clientInitiated.removeSubrange(index...) + case .server: + let index = self.serverInitiated.findIndexForFirstStreamIDLargerThan(streamID) + block(self.serverInitiated[index...]) + self.serverInitiated.removeSubrange(index...) + } + } } internal extension StreamMap where Element == HTTP2StreamStateMachine { @@ -220,6 +243,50 @@ extension CircularBuffer where Element: PerStreamData { return nil } + + fileprivate func findIndexForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + if self.count < binarySearchThreshold { + return self.linearScanForFirstStreamIDLargerThan(streamID) + } else { + return self.binarySearchForFirstStreamIDLargerThan(streamID) + } + } + + private func linearScanForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + var index = self.startIndex + + while index < self.endIndex { + let currentStreamID = self[index].streamID + if currentStreamID > streamID { + return index + } + self.formIndex(after: &index) + } + + return self.endIndex + } + + // Binary search is somewhat complex code compared to a linear scan, so we don't want to inline this code if we can avoid it. + @inline(never) + private func binarySearchForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index { + var left = self.startIndex + var right = self.endIndex + + while left != right { + let pivot = self.index(left, offsetBy: self.distance(from: left, to: right) / 2) + let currentStreamID = self[pivot].streamID + if currentStreamID > streamID { + right = pivot + } else if currentStreamID == streamID { + // The stream ID is the one after. + return self.index(after: pivot) + } else { + left = self.index(after: pivot) + } + } + + return left + } } extension CircularBuffer where Element == HTTP2StreamStateMachine { diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 55b4b708..3ed32c40 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -35,6 +35,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner { @available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings") func run() { XCTMain([ + testCase(CircularBufferExtensionsTests.allTests), testCase(CompoundOutboundBufferTest.allTests), testCase(ConcurrentStreamBufferTests.allTests), testCase(ConfiguringPipelineTests.allTests), diff --git a/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift new file mode 100644 index 00000000..c6ac70e0 --- /dev/null +++ b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift @@ -0,0 +1,44 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// CircularBufferExtensionsTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension CircularBufferExtensionsTests { + + @available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings") + static var allTests : [(String, (CircularBufferExtensionsTests) -> () throws -> Void)] { + return [ + ("testPrependFromEmpty", testPrependFromEmpty), + ("testPrependWithSpace", testPrependWithSpace), + ("testPrependWithExactlyEnoughSpace", testPrependWithExactlyEnoughSpace), + ("testPrependWithoutEnoughSpace", testPrependWithoutEnoughSpace), + ("testPrependContentsOfWithLotsOfSpace", testPrependContentsOfWithLotsOfSpace), + ("testPrependContentsOfWithExactlyTheSpace", testPrependContentsOfWithExactlyTheSpace), + ("testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement", testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement), + ("testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything", testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything), + ("testPrependContentsOfWithoutEnoughSpaceButContainingElements", testPrependContentsOfWithoutEnoughSpaceButContainingElements), + ("testPrependContentsOfWithExactlyTheSpaceFromEmpty", testPrependContentsOfWithExactlyTheSpaceFromEmpty), + ("testPrependContentsOfWithoutEnoughSpaceFromEmpty", testPrependContentsOfWithoutEnoughSpaceFromEmpty), + ] + } +} + diff --git a/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift new file mode 100644 index 00000000..da03065d --- /dev/null +++ b/Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift @@ -0,0 +1,140 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import XCTest +import NIO +@testable import NIOHTTP2 + +final class CircularBufferExtensionsTests: XCTestCase { + func testPrependFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(1) + XCTAssertEqual(Array(buffer), [1]) + XCTAssertEqual(buffer.count, 1) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + buffer.prependWithoutExpanding(4) + XCTAssertEqual(Array(buffer), [4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 4) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithExactlyEnoughSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: 1..<15) + buffer.prependWithoutExpanding(15) + XCTAssertEqual(Array(buffer), [15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependWithoutEnoughSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: 1..<16) + XCTAssertEqual(Array(buffer), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + + buffer.prependWithoutExpanding(17) + XCTAssertEqual(Array(buffer), [17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithLotsOfSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: [4, 5, 6]) + XCTAssertEqual(Array(buffer), [6, 5, 4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 6) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithExactlyTheSpace() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<16)) + XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2, 3]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<17)) + XCTAssertEqual(Array(buffer), [16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<19)) + XCTAssertEqual(Array(buffer), [18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithoutEnoughSpaceButContainingElements() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.append(contentsOf: [1, 2, 3]) + XCTAssertEqual(Array(buffer), [1, 2, 3]) + XCTAssertEqual(buffer.count, 3) + XCTAssertEqual(buffer.effectiveCapacity, 15) + + buffer.prependWithoutExpanding(contentsOf: Array(4..<32)) + XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithExactlyTheSpaceFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(contentsOf: Array(1..<16)) + XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } + + func testPrependContentsOfWithoutEnoughSpaceFromEmpty() { + var buffer = CircularBuffer(initialCapacity: 16) + buffer.prependWithoutExpanding(contentsOf: Array(1..<32)) + XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17]) + XCTAssertEqual(buffer.count, 15) + XCTAssertEqual(buffer.effectiveCapacity, 15) + } +} diff --git a/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift b/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift index ea879c0c..82beb236 100644 --- a/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift +++ b/Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift @@ -34,6 +34,8 @@ extension StreamMapTests { ("testCanFindElements", testCanFindElements), ("testRemoval", testRemoval), ("testModifySpecificValue", testModifySpecificValue), + ("testDroppingAllStreamIDsGreaterThanLinear", testDroppingAllStreamIDsGreaterThanLinear), + ("testDroppingAllStreamIDsGreaterThanBinarySearch", testDroppingAllStreamIDsGreaterThanBinarySearch), ] } } diff --git a/Tests/NIOHTTP2Tests/StreamMapTests.swift b/Tests/NIOHTTP2Tests/StreamMapTests.swift index 8a076fda..547d5727 100644 --- a/Tests/NIOHTTP2Tests/StreamMapTests.swift +++ b/Tests/NIOHTTP2Tests/StreamMapTests.swift @@ -169,6 +169,77 @@ final class StreamMapTests: XCTestCase { XCTAssertNil(map.modify(streamID: 101) { _ in XCTFail("must not execute") }) XCTAssertNil(map.modify(streamID: 102) { _ in XCTFail("must not execute") }) } + + func testDroppingAllStreamIDsGreaterThanLinear() throws { + for streamIDToDropFrom in HTTP2StreamID(1)..() + + for id in 1..<100 { + map.insert(StreamData(id)) + } + + for id in 1..<100 { + XCTAssertTrue(map.contains(streamID: HTTP2StreamID(id))) + } + + map.dropDataWithStreamIDGreaterThan(streamIDToDropFrom, initiatedBy: .client) { droppedStreams in + XCTAssertEqual(droppedStreams.map { $0.streamID }, (streamIDToDropFrom.advanced(by: 1)..() + + for id in 1..<500 { + map.insert(StreamData(id)) + } + + // If we do every stream ID this test takes way too damn long. Skip a bunch by iterating in 55s (odd numbers are good for ensuring we hit things that exist in both sides). + for streamIDToDropFrom in stride(from: HTTP2StreamID(1), to: HTTP2StreamID(500), by: 55) { + var map = map + + for id in 1..<500 { + XCTAssertTrue(map.contains(streamID: HTTP2StreamID(id))) + } + + map.dropDataWithStreamIDGreaterThan(streamIDToDropFrom, initiatedBy: .client) { droppedStreams in + XCTAssertEqual(droppedStreams.map { $0.streamID }, (streamIDToDropFrom.advanced(by: 1)..