Skip to content

Commit

Permalink
Improve bulk stream teardown.
Browse files Browse the repository at this point in the history
Motivation:

The logic for doing bulk stream teardown in the ConnectionStreamsState
was always a bit janky: we'd grab all the stream IDs out of the map,
filter out anything that was too small, insert them one-by-one into a
circular buffer, and then report back. I basically just translated that
directly into StreamMap, but on StreamMap this potentially performs very
poorly, as attempting to find the pivot point becomes a linear scan
through the stream data even when there are _loads_ of streams.

This patch swaps over to add a specialized function for bulk-removal of
streams. The specialized function passes the slice of data to be removed
to the caller so that they can compute on it. It does this by a closure
to try to avoid a CoW where possible.

While we're here I tried to address the loop over the CircularBuffer of
recently reset streams. This flushed out an awkward bug that has existed
in our recently reset streams logic for a very long time, which is that
while we _tried_ to maintain the size of the circular buffer, in
practice we misunderstood the CircularBuffer invariants and so this
never worked. I've added a bunch of unit tests for that logic and fixed
it up.

These two changes together will improve performance in stream bulk
teardown. This is an uncommon case (it only happens on GOAWAY), but it's
still good to make that faster.

Modifications:

- New operation on StreamMap
- New prependWithoutExpanding(contentsOf:) operation
- Unit tests for the circular buffer and stream map logic.

Result:

Better performance on bulk stream teardown.
  • Loading branch information
Lukasa committed Nov 25, 2020
1 parent 3315a63 commit 06a2cb7
Show file tree
Hide file tree
Showing 10 changed files with 372 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,27 +256,27 @@ struct ConnectionStreamState {
mutating func dropAllStreamsWithIDHigherThan(_ streamID: HTTP2StreamID,
droppedLocally: Bool,
initiatedBy initiator: HTTP2ConnectionStateMachine.ConnectionRole) -> [HTTP2StreamID]? {
let idsToDrop = self.activeStreams.elements(initiatedBy: initiator).drop(while: {$0.streamID <= streamID }).map { $0.streamID }
guard idsToDrop.count > 0 else {
return nil
var droppedIDs: [HTTP2StreamID] = []
self.activeStreams.dropDataWithStreamIDGreaterThan(streamID, initiatedBy: initiator) { data in
droppedIDs = data.map { $0.streamID }
}

for closingStreamID in idsToDrop {
self.activeStreams.removeValue(forStreamID: closingStreamID)
guard droppedIDs.count > 0 else {
return nil
}

if droppedLocally {
self.recentlyResetStreams.prependWithoutExpanding(closingStreamID)
}
if droppedLocally {
self.recentlyResetStreams.prependWithoutExpanding(contentsOf: droppedIDs)
}

switch initiator {
case .client:
self.clientStreamCount -= UInt32(idsToDrop.count)
self.clientStreamCount -= UInt32(droppedIDs.count)
case .server:
self.serverStreamCount -= UInt32(idsToDrop.count)
self.serverStreamCount -= UInt32(droppedIDs.count)
}

return idsToDrop
return droppedIDs
}

/// Determines the state machine result to generate when we've been asked to modify a missing stream.
Expand Down Expand Up @@ -317,15 +317,43 @@ struct ConnectionStreamState {
}


private extension CircularBuffer {
extension CircularBuffer {
// CircularBuffer may never be "full": that is, capacity may never equal count.
var effectiveCapacity: Int {
return self.capacity - 1
}

/// Prepends `element` without expanding the capacity, by dropping the
/// element at the end if necessary.
mutating func prependWithoutExpanding(_ element: Element) {
if self.capacity == self.count {
if self.effectiveCapacity == self.count {
self.removeLast()
}
self.prepend(element)
}

// NOTE: this could be generic over RandomAccessCollection if we wanted, I'm just saving code size by defining
// it specifically for now.
mutating func prependWithoutExpanding(contentsOf newElements: [Element]) {
// We're going to need to insert these new elements _backwards_, as though they were inserted
// one at a time.
var newElements = newElements.reversed()[...]
let newElementCount = newElements.count
let freeSpace = self.effectiveCapacity - self.count

if newElementCount >= self.effectiveCapacity {
// We need to completely replace the storage, and then only insert `self.effectiveCapacity` elements.
self.removeAll(keepingCapacity: true)
newElements = newElements.prefix(self.effectiveCapacity)
} else if newElementCount > freeSpace {
// We need to free up enough space to store everything we need, but some of the old elements will remain.
let elementsToRemove = newElementCount - freeSpace
self.removeLast(elementsToRemove)
}

assert(newElements.count <= self.effectiveCapacity - self.count)
self.insert(contentsOf: newElements, at: self.startIndex)
}
}


Expand Down
67 changes: 67 additions & 0 deletions Sources/NIOHTTP2/StreamMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ struct StreamMap<Element: PerStreamData> {
return self.serverInitiated.makeIterator()
}
}

/// This is a special case helper for the ConnectionStreamsState, which has to handle GOAWAY. In that case we
/// drop all stream state for a bunch of streams all at once. These streams are guaranteed to be sequential and based at a certain stream ID.
///
/// It's useful to get that data back too, and helpfully CircularBuffer will let us slice it out. We can't return it though: that will cause the mutation
/// to trigger a CoW. Instead we pass it to the caller in a block, and then do the removal.
///
/// This helper can turn a complex operation that involves repeated resizing of the base objects into a much faster one that also avoids
/// allocation.
mutating func dropDataWithStreamIDGreaterThan(_ streamID: HTTP2StreamID,
initiatedBy role: HTTP2ConnectionStateMachine.ConnectionRole,
_ block: (CircularBuffer<Element>.SubSequence) -> Void) {
switch role {
case .client:
let index = self.clientInitiated.findIndexForFirstStreamIDLargerThan(streamID)
block(self.clientInitiated[index...])
self.clientInitiated.removeSubrange(index...)
case .server:
let index = self.serverInitiated.findIndexForFirstStreamIDLargerThan(streamID)
block(self.serverInitiated[index...])
self.serverInitiated.removeSubrange(index...)
}
}
}

internal extension StreamMap where Element == HTTP2StreamStateMachine {
Expand Down Expand Up @@ -220,6 +243,50 @@ extension CircularBuffer where Element: PerStreamData {

return nil
}

fileprivate func findIndexForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
if self.count < binarySearchThreshold {
return self.linearScanForFirstStreamIDLargerThan(streamID)
} else {
return self.binarySearchForFirstStreamIDLargerThan(streamID)
}
}

private func linearScanForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
var index = self.startIndex

while index < self.endIndex {
let currentStreamID = self[index].streamID
if currentStreamID > streamID {
return index
}
self.formIndex(after: &index)
}

return self.endIndex
}

// Binary search is somewhat complex code compared to a linear scan, so we don't want to inline this code if we can avoid it.
@inline(never)
private func binarySearchForFirstStreamIDLargerThan(_ streamID: HTTP2StreamID) -> Index {
var left = self.startIndex
var right = self.endIndex

while left != right {
let pivot = self.index(left, offsetBy: self.distance(from: left, to: right) / 2)
let currentStreamID = self[pivot].streamID
if currentStreamID > streamID {
right = pivot
} else if currentStreamID == streamID {
// The stream ID is the one after.
return self.index(after: pivot)
} else {
left = self.index(after: pivot)
}
}

return left
}
}

extension CircularBuffer where Element == HTTP2StreamStateMachine {
Expand Down
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner {
@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
func run() {
XCTMain([
testCase(CircularBufferExtensionsTests.allTests),
testCase(CompoundOutboundBufferTest.allTests),
testCase(ConcurrentStreamBufferTests.allTests),
testCase(ConfiguringPipelineTests.allTests),
Expand Down
44 changes: 44 additions & 0 deletions Tests/NIOHTTP2Tests/CircularBufferExtensionsTests+XCTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// CircularBufferExtensionsTests+XCTest.swift
//
import XCTest

///
/// NOTE: This file was generated by generate_linux_tests.rb
///
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension CircularBufferExtensionsTests {

@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
static var allTests : [(String, (CircularBufferExtensionsTests) -> () throws -> Void)] {
return [
("testPrependFromEmpty", testPrependFromEmpty),
("testPrependWithSpace", testPrependWithSpace),
("testPrependWithExactlyEnoughSpace", testPrependWithExactlyEnoughSpace),
("testPrependWithoutEnoughSpace", testPrependWithoutEnoughSpace),
("testPrependContentsOfWithLotsOfSpace", testPrependContentsOfWithLotsOfSpace),
("testPrependContentsOfWithExactlyTheSpace", testPrependContentsOfWithExactlyTheSpace),
("testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement", testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement),
("testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything", testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything),
("testPrependContentsOfWithoutEnoughSpaceButContainingElements", testPrependContentsOfWithoutEnoughSpaceButContainingElements),
("testPrependContentsOfWithExactlyTheSpaceFromEmpty", testPrependContentsOfWithExactlyTheSpaceFromEmpty),
("testPrependContentsOfWithoutEnoughSpaceFromEmpty", testPrependContentsOfWithoutEnoughSpaceFromEmpty),
]
}
}

140 changes: 140 additions & 0 deletions Tests/NIOHTTP2Tests/CircularBufferExtensionsTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import XCTest
import NIO
@testable import NIOHTTP2

final class CircularBufferExtensionsTests: XCTestCase {
func testPrependFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(1)
XCTAssertEqual(Array(buffer), [1])
XCTAssertEqual(buffer.count, 1)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
buffer.prependWithoutExpanding(4)
XCTAssertEqual(Array(buffer), [4, 1, 2, 3])
XCTAssertEqual(buffer.count, 4)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithExactlyEnoughSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: 1..<15)
buffer.prependWithoutExpanding(15)
XCTAssertEqual(Array(buffer), [15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependWithoutEnoughSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: 1..<16)
XCTAssertEqual(Array(buffer), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)


buffer.prependWithoutExpanding(17)
XCTAssertEqual(Array(buffer), [17, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithLotsOfSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: [4, 5, 6])
XCTAssertEqual(Array(buffer), [6, 5, 4, 1, 2, 3])
XCTAssertEqual(buffer.count, 6)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithExactlyTheSpace() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<16))
XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2, 3])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithEnoughSpaceIfWeRemoveAnElement() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<17))
XCTAssertEqual(Array(buffer), [16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 1, 2])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithEnoughSpaceIfWeRemoveEverything() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<19))
XCTAssertEqual(Array(buffer), [18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithoutEnoughSpaceButContainingElements() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.append(contentsOf: [1, 2, 3])
XCTAssertEqual(Array(buffer), [1, 2, 3])
XCTAssertEqual(buffer.count, 3)
XCTAssertEqual(buffer.effectiveCapacity, 15)

buffer.prependWithoutExpanding(contentsOf: Array(4..<32))
XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithExactlyTheSpaceFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(contentsOf: Array(1..<16))
XCTAssertEqual(Array(buffer), [15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}

func testPrependContentsOfWithoutEnoughSpaceFromEmpty() {
var buffer = CircularBuffer<Int>(initialCapacity: 16)
buffer.prependWithoutExpanding(contentsOf: Array(1..<32))
XCTAssertEqual(Array(buffer), [31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17])
XCTAssertEqual(buffer.count, 15)
XCTAssertEqual(buffer.effectiveCapacity, 15)
}
}
2 changes: 2 additions & 0 deletions Tests/NIOHTTP2Tests/StreamMapTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extension StreamMapTests {
("testCanFindElements", testCanFindElements),
("testRemoval", testRemoval),
("testModifySpecificValue", testModifySpecificValue),
("testDroppingAllStreamIDsGreaterThanLinear", testDroppingAllStreamIDsGreaterThanLinear),
("testDroppingAllStreamIDsGreaterThanBinarySearch", testDroppingAllStreamIDsGreaterThanBinarySearch),
]
}
}
Expand Down
Loading

0 comments on commit 06a2cb7

Please sign in to comment.