Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement StreamMap and replace Dictionary #258

Merged
merged 2 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ struct ConnectionStreamState {
/// The "safe" default value of SETTINGS_MAX_CONCURRENT_STREAMS.
static let defaultMaxConcurrentStreams: UInt32 = 100

fileprivate static let emptyStreamMap = StreamMap<HTTP2StreamStateMachine>.empty()

/// The underlying data storage for the HTTP/2 stream state.
private var activeStreams: [HTTP2StreamID: HTTP2StreamStateMachine]
private var activeStreams: StreamMap<HTTP2StreamStateMachine>

/// A collection of recently reset streams.
///
Expand Down Expand Up @@ -63,8 +65,7 @@ struct ConnectionStreamState {
}

init() {
// While there may be many concurrent streams, usually there will only be a small number.
self.activeStreams = Dictionary(minimumCapacity: 8)
self.activeStreams = StreamMap()
self.recentlyResetStreams = CircularBuffer(initialCapacity: self.maxResetStreams)
}

Expand All @@ -80,7 +81,7 @@ struct ConnectionStreamState {
mutating func createRemotelyPushedStream(streamID: HTTP2StreamID, remoteInitialWindowSize: UInt32) throws {
try self.reserveServerStreamID(streamID)
let streamState = HTTP2StreamStateMachine(receivedPushPromiseCreatingStreamID: streamID, remoteInitialWindowSize: remoteInitialWindowSize)
self.activeStreams[streamID] = streamState
self.activeStreams.insert(streamState)
}

/// Create stream state for a locally pushed stream.
Expand All @@ -95,23 +96,9 @@ struct ConnectionStreamState {
mutating func createLocallyPushedStream(streamID: HTTP2StreamID, localInitialWindowSize: UInt32) throws {
try self.reserveServerStreamID(streamID)
let streamState = HTTP2StreamStateMachine(sentPushPromiseCreatingStreamID: streamID, localInitialWindowSize: localInitialWindowSize)
self.activeStreams[streamID] = streamState
self.activeStreams.insert(streamState)
}

// These functions exist as a performance optimisation: by mutating the optional returned from Dictionary directly
// inline, we can avoid the dictionary needing to hash the key twice, which it would have to do if we removed the
// value, mutated it, and then re-inserted it.
//
// However, we need to be a bit careful here, as the performance gain from doing this would be completely swamped
// if the Swift compiler failed to inline this method into its caller. This would force the closure to have its
// context heap-allocated, and the cost of doing that is vastly higher than the cost of hashing the key a second
// time. So for this reason we make it clear to the compiler that these methods *must* be inlined at the call-site.
// Sorry about doing this!
//
// The mitigation for this is that these methods are only ever called by *very* small functions: basically functions
// that define the closure and then call these methods, and nothing else. So the cost of inlining this should be
// small.

/// Obtains a stream state machine in order to modify its state, potentially creating it if necessary.
///
/// The `creator` block will be called if the stream does not exist already. The `modifier` block will be called
Expand All @@ -125,7 +112,6 @@ struct ConnectionStreamState {
/// - modifier: A block that will be invoked to modify the stream state, if present.
/// - throws: Any errors thrown from the creator.
/// - returns: The result of the state modification, as well as any state change that occurred to the stream.
@inline(__always)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to remove this because otherwise the 5.0 compiler crashes. 🤷 We don't much care about perf on 5.0 anymore, it's not the best target, so we'll accept this change. Newer compilers didn't need this anyway.

mutating func modifyStreamStateCreateIfNeeded(streamID: HTTP2StreamID,
localRole: HTTP2StreamStateMachine.StreamRole,
localInitialWindowSize: UInt32,
Expand All @@ -141,15 +127,15 @@ struct ConnectionStreamState {
}

// FIXME(cory): This isn't ideal, but it's necessary to avoid issues with overlapping accesses on the activeStreams
// dictionary. The above closure takes a mutable copy of self, which is a big issue, so we should investigate whether
// map. The above closure takes a mutable copy of self, which is a big issue, so we should investigate whether
// it's possible for me to be smarter here.
var activeStreams: [HTTP2StreamID: HTTP2StreamStateMachine] = [:]
var activeStreams = ConnectionStreamState.emptyStreamMap
swap(&activeStreams, &self.activeStreams)
defer {
swap(&activeStreams, &self.activeStreams)
}

guard let result = try activeStreams[streamID].transformOrCreateAutoClose(creator, modifier) else {
guard let result = try activeStreams.transformOrCreateAutoClose(streamID: streamID, creator, modifier) else {
preconditionFailure("Stream was missing even though we should have created it!")
}

Expand All @@ -171,12 +157,11 @@ struct ConnectionStreamState {
/// - ignoreClosed: Whether a closed stream should be ignored. Should be set to `true` when receiving window update or reset stream frames.
/// - modifier: A block that will be invoked to modify the stream state, if present.
/// - returns: The result of the state modification, as well as any state change that occurred to the stream.
@inline(__always)
mutating func modifyStreamState(streamID: HTTP2StreamID,
ignoreRecentlyReset: Bool,
ignoreClosed: Bool = false,
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams[streamID].autoClosingTransform(modifier) else {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
return StateMachineResultWithStreamEffect(result: self.streamMissing(streamID: streamID, ignoreRecentlyReset: ignoreRecentlyReset, ignoreClosed: ignoreClosed), effect: nil)
}

Expand All @@ -201,7 +186,7 @@ struct ConnectionStreamState {
@inline(__always)
mutating func locallyResetStreamState(streamID: HTTP2StreamID,
_ modifier: (inout HTTP2StreamStateMachine) -> StateMachineResultWithStreamEffect) -> StateMachineResultWithStreamEffect {
guard let result = self.activeStreams[streamID].autoClosingTransform(modifier) else {
guard let result = self.activeStreams.autoClosingTransform(streamID: streamID, modifier) else {
// We never ignore recently reset streams here, as this should only ever be used when *sending* frames.
return StateMachineResultWithStreamEffect(result: self.streamMissing(streamID: streamID, ignoreRecentlyReset: false, ignoreClosed: false), effect: nil)
}
Expand Down Expand Up @@ -271,13 +256,13 @@ struct ConnectionStreamState {
mutating func dropAllStreamsWithIDHigherThan(_ streamID: HTTP2StreamID,
droppedLocally: Bool,
initiatedBy initiator: HTTP2ConnectionStateMachine.ConnectionRole) -> [HTTP2StreamID]? {
let idsToDrop = self.activeStreams.keys.filter { $0.mayBeInitiatedBy(initiator) && $0 > streamID }
let idsToDrop = self.activeStreams.elements(initiatedBy: initiator).drop(while: {$0.streamID <= streamID }).map { $0.streamID }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shame you couldn't get an optimal leap to start in here. eg Binary search or scan depending on size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can address this in a separate patch. I want to add a benchmark that actually hits this code-path, because right now we can't measure any improvement or change here.

guard idsToDrop.count > 0 else {
return nil
}

for closingStreamID in idsToDrop {
self.activeStreams.removeValue(forKey: closingStreamID)
self.activeStreams.removeValue(forStreamID: closingStreamID)

if droppedLocally {
self.recentlyResetStreams.prependWithoutExpanding(closingStreamID)
Expand Down Expand Up @@ -322,7 +307,7 @@ struct ConnectionStreamState {
}

private mutating func streamClosed(_ streamID: HTTP2StreamID) {
assert(!self.activeStreams.keys.contains(streamID))
assert(!self.activeStreams.contains(streamID: streamID))
if streamID.mayBeInitiatedBy(.client) {
self.clientStreamCount -= 1
} else {
Expand All @@ -344,23 +329,6 @@ private extension CircularBuffer {
}


internal extension Dictionary {
/// Calls a function once with each value of the dictionary, allowing the function
/// to mutate the value in-place in the dictionary.
///
/// As with the other block-taking functions in this module, this is @inline(__always) to ensure
/// that we don't end up actually heap-allocating a closure here. We're sorry about it!
@inline(__always)
mutating func mutatingForEachValue(_ body: (inout Value) throws -> Void) rethrows {
var index = self.startIndex
while index != self.endIndex {
try body(&self.values[index])
self.formIndex(after: &index)
}
}
}


extension StreamStateChange {
fileprivate var closedStream: Bool {
switch self {
Expand Down

This file was deleted.

Loading