Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve bulk stream teardown. #261

Merged
merged 2 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,27 +256,27 @@ struct ConnectionStreamState {
mutating func dropAllStreamsWithIDHigherThan(_ streamID: HTTP2StreamID,
droppedLocally: Bool,
initiatedBy initiator: HTTP2ConnectionStateMachine.ConnectionRole) -> [HTTP2StreamID]? {
let idsToDrop = self.activeStreams.elements(initiatedBy: initiator).drop(while: {$0.streamID <= streamID }).map { $0.streamID }
guard idsToDrop.count > 0 else {
return nil
var droppedIDs: [HTTP2StreamID] = []
self.activeStreams.dropDataWithStreamIDGreaterThan(streamID, initiatedBy: initiator) { data in
droppedIDs = data.map { $0.streamID }
}

for closingStreamID in idsToDrop {
self.activeStreams.removeValue(forStreamID: closingStreamID)
guard droppedIDs.count > 0 else {
return nil
}

if droppedLocally {
self.recentlyResetStreams.prependWithoutExpanding(closingStreamID)
}
if droppedLocally {
self.recentlyResetStreams.prependWithoutExpanding(contentsOf: droppedIDs)
}

switch initiator {
case .client:
self.clientStreamCount -= UInt32(idsToDrop.count)
self.clientStreamCount -= UInt32(droppedIDs.count)
case .server:
self.serverStreamCount -= UInt32(idsToDrop.count)
self.serverStreamCount -= UInt32(droppedIDs.count)
}

return idsToDrop
return droppedIDs
}

/// Determines the state machine result to generate when we've been asked to modify a missing stream.
Expand Down Expand Up @@ -317,15 +317,43 @@ struct ConnectionStreamState {
}


private extension CircularBuffer {
extension CircularBuffer {
// CircularBuffer may never be "full": that is, capacity may never equal count.
var effectiveCapacity: Int {
return self.capacity - 1
}

/// Prepends `element` without expanding the capacity, by dropping the
/// element at the end if necessary.
mutating func prependWithoutExpanding(_ element: Element) {
if self.capacity == self.count {
if self.effectiveCapacity == self.count {
self.removeLast()
}
self.prepend(element)
}

// NOTE: this could be generic over RandomAccessCollection if we wanted, I'm just saving code size by defining
// it specifically for now.
mutating func prependWithoutExpanding(contentsOf newElements: [Element]) {
// We're going to need to insert these new elements _backwards_, as though they were inserted
// one at a time.
var newElements = newElements.reversed()[...]
let newElementCount = newElements.count
let freeSpace = self.effectiveCapacity - self.count

if newElementCount >= self.effectiveCapacity {
// We need to completely replace the storage, and then only insert `self.effectiveCapacity` elements.
self.removeAll(keepingCapacity: true)
newElements = newElements.prefix(self.effectiveCapacity)
} else if newElementCount > freeSpace {
// We need to free up enough space to store everything we need, but some of the old elements will remain.
let elementsToRemove = newElementCount - freeSpace
self.removeLast(elementsToRemove)
}

assert(newElements.count <= self.effectiveCapacity - self.count)
self.insert(contentsOf: newElements, at: self.startIndex)
}
}


Expand Down
67 changes: 67 additions & 0 deletions Sources/NIOHTTP2/StreamMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ struct StreamMap<Element: PerStreamData> {
return self.serverInitiated.makeIterator()
}
}

/// This is a special case helper for the ConnectionStreamsState, which has to handle GOAWAY. In that case we
/// drop all stream state for a bunch of streams all at once. These streams are guaranteed to be sequential and based at a certain stream ID.
///
/// It's useful to get that data back too, and helpfully CircularBuffer will let us slice it out. We can't return it though: that will cause the mutation
/// to trigger a CoW. Instead we pass it to the caller in a block, and then do the removal.
///
/// This helper can turn a complex operation that involves repeated resizing of the base objects into a much faster one that also avoids
/// allocation.
mutating func dropDataWithStreamIDGreaterThan(_ streamID: HTTP2StreamID,
initiatedBy role: HTTP2ConnectionStateMachine.ConnectionRole,
_ block: (CircularBuffer<Element>.SubSequence) -> Void) {
switch role {
case .client:
let index = self.clientInitiated.findIndexForFirstStreamIDLargerThan(streamID)
block(self.clientInitiated[index...])
self.clientInitiated.removeSubrange(index...)
case .server:
let index = self.serverInitiated.findIndexForFirstStreamIDLargerThan(streamID)
block(self.serverInitiated[index...])
self.serverInitiated.removeSubrange(index...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The duplication in places like this does look pretty sad.

}
}
}

internal extension StreamMap where Element == HTTP2StreamStateMachine {
Expand Down Expand Up @@ -220,6 +243,50 @@ extension CircularBuffer where Element: PerStreamData {

return nil
}

fileprivate func findIndexForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
if self.count < binarySearchThreshold {
return self.linearScanForFirstStreamIDLargerThan(streamID)
} else {
return self.binarySearchForFirstStreamIDLargerThan(streamID)
}
}

private func linearScanForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
var index = self.startIndex

while index < self.endIndex {
let currentStreamID = self[index].streamID
if currentStreamID > streamID {
return index
}
self.formIndex(after: &index)
}

return self.endIndex
}

// Binary search is somewhat complex code compared to a linear scan, so we don't want to inline this code if we can avoid it.
@inline(never)
private func binarySearchForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
var left = self.startIndex
var right = self.endIndex

while left != right {
let pivot = self.index(left, offsetBy: self.distance(from: left, to: right) / 2)
let currentStreamID = self[pivot].streamID
if currentStreamID > streamID {
right = pivot
} else if currentStreamID == streamID {
// The stream ID is the one after.
return self.index(after: pivot)
} else {
left = self.index(after: pivot)
}
}

return left
}
}

extension CircularBuffer where Element == HTTP2StreamStateMachine {
Expand Down
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner {
@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
func run() {
XCTMain([
testCase(CircularBufferExtensionsTests.allTests),
testCase(CompoundOutboundBufferTest.allTests),
testCase(ConcurrentStreamBufferTests.allTests),
testCase(ConfiguringPipelineTests.allTests),
Expand Down
44 changes: 44 additions & 0 deletions Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// CircularBufferExtensionsTests+XCTest.swift
//
import XCTest

///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension CircularBufferExtensionsTests {

@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
static var allTests : [(String, (CircularBufferExtensionsTests) -> () throws -> Void)] {
return [
("testPrependFromEmpty", testPrependFromEmpty),
("testPrependWithSpace", testPrependWithSpace),
("testPrependWithExactlyEnoughSpace", testPrependWithExactlyEnoughSpace),
("testPrependWithoutEnoughSpace", testPrependWithoutEnoughSpace),
("testPrependContentsOfWithLotsOfSpace", testPrependContentsOfWithLotsOfSpace),
("testPrependContentsOfWithExactlyTheSpace", testPrependContentsOfWithExactlyTheSpace),
("testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement", testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement),
("testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything", testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything),
("testPrependContentsOfWithoutEnoughSpaceButContainingElements", testPrependContentsOfWithoutEnoughSpaceButContainingElements),
("testPrependContentsOfWithExactlyTheSpaceFromEmpty", testPrependContentsOfWithExactlyTheSpaceFromEmpty),
("testPrependContentsOfWithoutEnoughSpaceFromEmpty", testPrependContentsOfWithoutEnoughSpaceFromEmpty),
]
}
}

140 changes: 140 additions & 0 deletions Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import XCTest
import NIO
@testable import NIOHTTP2

final class CircularBufferExtensionsTests: XCTestCase {
func testPrependFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(1)
XCTAssertEqual(Array(buffer), [1])
XCTAssertEqual(buffer.count, 1)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
buffer.prependWithoutExpanding(4)
XCTAssertEqual(Array(buffer), [4, 1, 2, 3])
XCTAssertEqual(buffer.count, 4)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithExactlyEnoughSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: 1..<15)
buffer.prependWithoutExpanding(15)
XCTAssertEqual(Array(buffer), [15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithoutEnoughSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: 1..<16)
XCTAssertEqual(Array(buffer), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)


buffer.prependWithoutExpanding(17)
XCTAssertEqual(Array(buffer), [17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithLotsOfSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: [4, 5, 6])
XCTAssertEqual(Array(buffer), [6, 5, 4, 1, 2, 3])
XCTAssertEqual(buffer.count, 6)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithExactlyTheSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<16))
XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2, 3])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<17))
XCTAssertEqual(Array(buffer), [16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<19))
XCTAssertEqual(Array(buffer), [18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithoutEnoughSpaceButContainingElements() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<32))
XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithExactlyTheSpaceFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(contentsOf: Array(1..<16))
XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithoutEnoughSpaceFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(contentsOf: Array(1..<32))
XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}
}
2 changes: 2 additions & 0 deletions Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extension StreamMapTests {
("testCanFindElements", testCanFindElements),
("testRemoval", testRemoval),
("testModifySpecificValue", testModifySpecificValue),
("testDroppingAllStreamIDsGreaterThanLinear", testDroppingAllStreamIDsGreaterThanLinear),
("testDroppingAllStreamIDsGreaterThanBinarySearch", testDroppingAllStreamIDsGreaterThanBinarySearch),
]
}
}
Expand Down
Loading